HADOOP-12666. Support Microsoft Azure Data Lake - as a file system in Hadoop. Contributed by Vishwajeet Dusane.
(cherry picked from commit 9581fb715cbc8a6ad28566e83c6d0242a7306688) Conflicts: hadoop-tools/hadoop-tools-dist/pom.xml hadoop-tools/pom.xml Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a8f03ef7 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a8f03ef7 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a8f03ef7 Branch: refs/heads/branch-2 Commit: a8f03ef7ea8163c00ce5d72a4e1c77284befe5aa Parents: 6350e4b Author: Chris Nauroth <cnaur...@apache.org> Authored: Thu Jun 9 14:49:05 2016 -0700 Committer: Chris Nauroth <cnaur...@apache.org> Committed: Thu Jun 9 14:49:05 2016 -0700 ---------------------------------------------------------------------- .../src/main/resources/core-default.xml | 60 + .../conf/TestCommonConfigurationFields.java | 6 + hadoop-project/src/site/site.xml | 2 + .../dev-support/findbugs-exclude.xml | 24 + hadoop-tools/hadoop-azure-datalake/pom.xml | 180 +++ .../main/java/org/apache/hadoop/fs/adl/Adl.java | 52 + .../org/apache/hadoop/fs/adl/AdlFileSystem.java | 41 + ...hedRefreshTokenBasedAccessTokenProvider.java | 135 +++ .../hadoop/fs/adl/oauth2/package-info.java | 23 + .../org/apache/hadoop/fs/adl/package-info.java | 23 + .../org/apache/hadoop/hdfs/web/ADLConfKeys.java | 61 + .../apache/hadoop/hdfs/web/BufferManager.java | 180 +++ .../web/PrivateAzureDataLakeFileSystem.java | 1108 ++++++++++++++++++ ...hedRefreshTokenBasedAccessTokenProvider.java | 37 + .../hadoop/hdfs/web/oauth2/package-info.java | 24 + .../apache/hadoop/hdfs/web/package-info.java | 25 + .../hadoop/hdfs/web/resources/ADLFlush.java | 49 + .../hdfs/web/resources/ADLGetOpParam.java | 96 ++ .../hdfs/web/resources/ADLPostOpParam.java | 97 ++ .../hdfs/web/resources/ADLPutOpParam.java | 94 ++ .../hdfs/web/resources/ADLVersionInfo.java | 51 + .../web/resources/AppendADLNoRedirectParam.java | 45 + .../web/resources/CreateADLNoRedirectParam.java | 44 + .../hadoop/hdfs/web/resources/LeaseParam.java | 53 + .../web/resources/ReadADLNoRedirectParam.java | 44 + .../hadoop/hdfs/web/resources/package-info.java | 27 + .../src/site/markdown/index.md | 219 ++++ ...hedRefreshTokenBasedAccessTokenProvider.java | 147 +++ hadoop-tools/hadoop-tools-dist/pom.xml | 6 + hadoop-tools/pom.xml | 1 + 30 files changed, 2954 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8f03ef7/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index 490f1de..41bf6d8 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -2213,4 +2213,64 @@ needs to be specified in net.topology.script.file.name. </description> </property> + + + <!-- Azure Data Lake File System Configurations --> + + <property> + <name>adl.feature.override.readahead</name> + <value>true</value> + <description> + Enables read aheads in the ADL client, the feature is used to + improve read throughput. + This works in conjunction with the value set in + adl.feature.override.readahead.max.buffersize. + When set to false the read ahead feature is turned off. + Default : True if not configured. + </description> + </property> + + <property> + <name>adl.feature.override.readahead.max.buffersize</name> + <value>8388608</value> + <description> + Define maximum buffer size to cache read ahead data, this is + allocated per process to + cache read ahead data. Applicable only when + adl.feature.override.readahead is set to true. + Default : 8388608 Byte i.e. 8MB if not configured. + </description> + </property> + + <property> + <name>adl.feature.override.readahead.max.concurrent.connection</name> + <value>2</value> + <description> + Define maximum concurrent connection can be established to + read ahead. If the data size is less than 4MB then only 1 read n/w + connection + is set. If the data size is less than 4MB but less than 8MB then 2 read + n/w connection + is set. Data greater than 8MB then value set under the property would + take + effect. Applicable only when adl.feature.override.readahead is set + to true and buffer size is greater than 8MB. + It is recommended to reset this property if the + adl.feature.override.readahead.max.buffersize + is less than 8MB to gain performance. Application has to consider + throttling limit for the account as well before configuring large + buffer size. + </description> + </property> + + <property> + <name>fs.adl.impl</name> + <value>org.apache.hadoop.fs.adl.AdlFileSystem</value> + </property> + + <property> + <name>fs.AbstractFileSystem.adl.impl</name> + <value>org.apache.hadoop.fs.adl.Adl</value> + </property> + </configuration> http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8f03ef7/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java index 90f7514..020474f 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java @@ -102,6 +102,12 @@ public class TestCommonConfigurationFields extends TestConfigurationFieldsBase { xmlPrefixToSkipCompare.add("s3."); xmlPrefixToSkipCompare.add("s3native."); + // ADL properties are in a different subtree + // - org.apache.hadoop.hdfs.web.ADLConfKeys + xmlPrefixToSkipCompare.add("adl."); + xmlPropsToSkipCompare.add("fs.adl.impl"); + xmlPropsToSkipCompare.add("fs.AbstractFileSystem.adl.impl"); + // Deprecated properties. These should eventually be removed from the // class. configurationPropsToSkipCompare http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8f03ef7/hadoop-project/src/site/site.xml ---------------------------------------------------------------------- diff --git a/hadoop-project/src/site/site.xml b/hadoop-project/src/site/site.xml index 0167f0c..dd9e3e9 100644 --- a/hadoop-project/src/site/site.xml +++ b/hadoop-project/src/site/site.xml @@ -144,6 +144,8 @@ <menu name="Hadoop Compatible File Systems" inherit="top"> <item name="Amazon S3" href="hadoop-aws/tools/hadoop-aws/index.html"/> <item name="Azure Blob Storage" href="hadoop-azure/index.html"/> + <item name="Azure Data Lake Storage" + href="hadoop-azure-datalake/index.html"/> <item name="OpenStack Swift" href="hadoop-openstack/index.html"/> </menu> http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8f03ef7/hadoop-tools/hadoop-azure-datalake/dev-support/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure-datalake/dev-support/findbugs-exclude.xml b/hadoop-tools/hadoop-azure-datalake/dev-support/findbugs-exclude.xml new file mode 100644 index 0000000..4fd36ef --- /dev/null +++ b/hadoop-tools/hadoop-azure-datalake/dev-support/findbugs-exclude.xml @@ -0,0 +1,24 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<FindBugsFilter> + <!-- Buffer object is accessed withing trusted code and intentionally assigned instead of array copy --> + <Match> + <Class name="org.apache.hadoop.hdfs.web.PrivateAzureDataLakeFileSystem$BatchAppendOutputStream$CommitTask"/> + <Bug pattern="EI_EXPOSE_REP2"/> + <Priority value="2"/> + </Match> +</FindBugsFilter> http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8f03ef7/hadoop-tools/hadoop-azure-datalake/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure-datalake/pom.xml b/hadoop-tools/hadoop-azure-datalake/pom.xml new file mode 100644 index 0000000..66c874c --- /dev/null +++ b/hadoop-tools/hadoop-azure-datalake/pom.xml @@ -0,0 +1,180 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. See accompanying LICENSE file. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-project</artifactId> + <version>2.9.0-SNAPSHOT</version> + <relativePath>../../hadoop-project</relativePath> + </parent> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-azure-datalake</artifactId> + <name>Apache Hadoop Azure Data Lake support</name> + <description> + This module contains code to support integration with Azure Data Lake. + </description> + <packaging>jar</packaging> + <properties> + <okHttpVersion>2.4.0</okHttpVersion> + <minimalJsonVersion>0.9.1</minimalJsonVersion> + <file.encoding>UTF-8</file.encoding> + <downloadSources>true</downloadSources> + </properties> + + <build> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>findbugs-maven-plugin</artifactId> + <configuration> + <findbugsXmlOutput>true</findbugsXmlOutput> + <xmlOutput>true</xmlOutput> + <excludeFilterFile> + ${basedir}/dev-support/findbugs-exclude.xml + </excludeFilterFile> + <effort>Max</effort> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-project-info-reports-plugin</artifactId> + + <configuration> + <dependencyDetailsEnabled>false</dependencyDetailsEnabled> + <dependencyLocationsEnabled>false + </dependencyLocationsEnabled> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <executions> + <execution> + <id>deplist</id> + <phase>compile</phase> + <goals> + <goal>list</goal> + </goals> + <configuration> + <!-- build a shellprofile --> + <outputFile>${project.basedir}/target/hadoop-tools-deps/${project.artifactId}.tools-optional.txt</outputFile> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + + + <!-- + The following is to suppress a m2e warning in eclipse + (m2e doesn't know how to handle maven-enforcer:enforce, so we have to tell m2e to ignore it) + see: http://stackoverflow.com/questions/13040788/how-to-elimate-the-maven-enforcer-plugin-goal-enforce-is-ignored-by-m2e-wa + --> + <pluginManagement> + <plugins> + <plugin> + <groupId>org.eclipse.m2e</groupId> + <artifactId>lifecycle-mapping</artifactId> + <version>1.0.0</version> + <configuration> + <lifecycleMappingMetadata> + <pluginExecutions> + <pluginExecution> + <pluginExecutionFilter> + <groupId>org.apache.maven.plugins + </groupId> + <artifactId>maven-enforcer-plugin + </artifactId> + <versionRange>[1.0.0,)</versionRange> + <goals> + <goal>enforce</goal> + </goals> + </pluginExecutionFilter> + <action> + <ignore/> + </action> + </pluginExecution> + </pluginExecutions> + </lifecycleMappingMetadata> + </configuration> + </plugin> + </plugins> + </pluginManagement> + + </build> + + <dependencies> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + <exclusions> + <exclusion> + <artifactId>servlet-api</artifactId> + <groupId>javax.servlet</groupId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs-client</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.eclipsesource.minimal-json</groupId> + <artifactId>minimal-json</artifactId> + <version>0.9.1</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>com.squareup.okhttp</groupId> + <artifactId>mockwebserver</artifactId> + <version>2.4.0</version> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8f03ef7/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/Adl.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/Adl.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/Adl.java new file mode 100644 index 0000000..4642d6b --- /dev/null +++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/Adl.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.fs.adl; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.DelegateToFileSystem; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; + +/** + * Expose adl:// scheme to access ADL file system. + */ +public class Adl extends DelegateToFileSystem { + + Adl(URI theUri, Configuration conf) throws IOException, URISyntaxException { + super(theUri, createDataLakeFileSystem(conf), conf, AdlFileSystem.SCHEME, + false); + } + + private static AdlFileSystem createDataLakeFileSystem(Configuration conf) { + AdlFileSystem fs = new AdlFileSystem(); + fs.setConf(conf); + return fs; + } + + /** + * @return Default port for ADL File system to communicate + */ + @Override + public final int getUriDefaultPort() { + return AdlFileSystem.DEFAULT_PORT; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8f03ef7/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFileSystem.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFileSystem.java new file mode 100644 index 0000000..11e1e0b --- /dev/null +++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFileSystem.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.fs.adl; + +import org.apache.hadoop.hdfs.web.PrivateAzureDataLakeFileSystem; + +/** + * Expose adl:// scheme to access ADL file system. + */ +public class AdlFileSystem extends PrivateAzureDataLakeFileSystem { + + public static final String SCHEME = "adl"; + public static final int DEFAULT_PORT = 443; + + @Override + public String getScheme() { + return SCHEME; + } + + @Override + public int getDefaultPort() { + return DEFAULT_PORT; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8f03ef7/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/CachedRefreshTokenBasedAccessTokenProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/CachedRefreshTokenBasedAccessTokenProvider.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/CachedRefreshTokenBasedAccessTokenProvider.java new file mode 100644 index 0000000..b7f3b00 --- /dev/null +++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/CachedRefreshTokenBasedAccessTokenProvider.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.fs.adl.oauth2; + +import java.io.IOException; +import java.util.Map; +import java.util.LinkedHashMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.web.oauth2.AccessTokenProvider; +import org.apache.hadoop.hdfs.web.oauth2.ConfRefreshTokenBasedAccessTokenProvider; +import org.apache.hadoop.hdfs.web.oauth2.PrivateCachedRefreshTokenBasedAccessTokenProvider; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.OAUTH_CLIENT_ID_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.OAUTH_REFRESH_URL_KEY; +import static org.apache.hadoop.hdfs.web.oauth2.ConfRefreshTokenBasedAccessTokenProvider.OAUTH_REFRESH_TOKEN_KEY; + +/** + * Share refresh tokens across all ADLS instances with a common client ID. The + * {@link AccessTokenProvider} can be shared across multiple instances, + * amortizing the cost of refreshing tokens. + */ +public class CachedRefreshTokenBasedAccessTokenProvider + extends PrivateCachedRefreshTokenBasedAccessTokenProvider { + + public static final String FORCE_REFRESH = "adl.force.token.refresh"; + + private static final Logger LOG = + LoggerFactory.getLogger(CachedRefreshTokenBasedAccessTokenProvider.class); + + /** Limit size of provider cache. */ + static final int MAX_PROVIDERS = 10; + @SuppressWarnings("serial") + private static final Map<String, AccessTokenProvider> CACHE = + new LinkedHashMap<String, AccessTokenProvider>() { + @Override + public boolean removeEldestEntry( + Map.Entry<String, AccessTokenProvider> e) { + return size() > MAX_PROVIDERS; + } + }; + + private AccessTokenProvider instance = null; + + /** + * Create handle for cached instance. + */ + public CachedRefreshTokenBasedAccessTokenProvider() { + } + + /** + * Gets the access token from internally cached + * ConfRefreshTokenBasedAccessTokenProvider instance. + * + * @return Valid OAuth2 access token for the user. + * @throws IOException when system error, internal server error or user error + */ + @Override + public synchronized String getAccessToken() throws IOException { + return instance.getAccessToken(); + } + + /** + * @return A cached Configuration consistent with the parameters of this + * instance. + */ + @Override + public synchronized Configuration getConf() { + return instance.getConf(); + } + + /** + * Configure cached instance. Note that the Configuration instance returned + * from subsequent calls to {@link #getConf() getConf} may be from a + * previous, cached entry. + * @param conf Configuration instance + */ + @Override + public synchronized void setConf(Configuration conf) { + String id = conf.get(OAUTH_CLIENT_ID_KEY); + if (null == id) { + throw new IllegalArgumentException("Missing client ID"); + } + synchronized (CACHE) { + instance = CACHE.get(id); + if (null == instance + || conf.getBoolean(FORCE_REFRESH, false) + || replace(instance, conf)) { + instance = newInstance(); + // clone configuration + instance.setConf(new Configuration(conf)); + CACHE.put(id, instance); + LOG.debug("Created new client {}", id); + } + } + } + + AccessTokenProvider newInstance() { + return new ConfRefreshTokenBasedAccessTokenProvider(); + } + + private static boolean replace(AccessTokenProvider cached, Configuration c2) { + // ConfRefreshTokenBasedAccessTokenProvider::setConf asserts !null + final Configuration c1 = cached.getConf(); + for (String key : new String[] { + OAUTH_REFRESH_TOKEN_KEY, OAUTH_REFRESH_URL_KEY }) { + if (!c1.get(key).equals(c2.get(key))) { + // replace cached instance for this clientID + return true; + } + } + return false; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8f03ef7/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/package-info.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/package-info.java new file mode 100644 index 0000000..b444984 --- /dev/null +++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/** + * public interface to expose OAuth2 authentication related features. + */ +package org.apache.hadoop.fs.adl.oauth2; http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8f03ef7/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/package-info.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/package-info.java new file mode 100644 index 0000000..98e6a77 --- /dev/null +++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/** + * Supporting classes for metrics instrumentation. + */ +package org.apache.hadoop.fs.adl; http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8f03ef7/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/ADLConfKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/ADLConfKeys.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/ADLConfKeys.java new file mode 100644 index 0000000..a7f932f --- /dev/null +++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/ADLConfKeys.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.hdfs.web; + +/** + * Constants. + */ +public final class ADLConfKeys { + public static final String + ADL_FEATURE_CONCURRENT_READ_AHEAD_MAX_CONCURRENT_CONN = + "adl.feature.override.readahead.max.concurrent.connection"; + public static final int + ADL_FEATURE_CONCURRENT_READ_AHEAD_MAX_CONCURRENT_CONN_DEFAULT = 2; + public static final String ADL_WEBSDK_VERSION_KEY = "ADLFeatureSet"; + static final String ADL_DEBUG_OVERRIDE_LOCAL_USER_AS_OWNER = + "adl.debug.override.localuserasfileowner"; + static final boolean ADL_DEBUG_SET_LOCAL_USER_AS_OWNER_DEFAULT = false; + static final String ADL_FEATURE_REDIRECT_OFF = + "adl.feature.override.redirection.off"; + static final boolean ADL_FEATURE_REDIRECT_OFF_DEFAULT = true; + static final String ADL_FEATURE_GET_BLOCK_LOCATION_LOCALLY_BUNDLED = + "adl.feature.override.getblocklocation.locally.bundled"; + static final boolean ADL_FEATURE_GET_BLOCK_LOCATION_LOCALLY_BUNDLED_DEFAULT + = true; + static final String ADL_FEATURE_CONCURRENT_READ_WITH_READ_AHEAD = + "adl.feature.override.readahead"; + static final boolean ADL_FEATURE_CONCURRENT_READ_WITH_READ_AHEAD_DEFAULT = + true; + static final String ADL_FEATURE_CONCURRENT_READ_WITH_READ_AHEAD_BUFFER_SIZE = + "adl.feature.override.readahead.max.buffersize"; + + static final int KB = 1024; + static final int MB = KB * KB; + static final int DEFAULT_BLOCK_SIZE = 4 * MB; + static final int DEFAULT_EXTENT_SIZE = 256 * MB; + static final int DEFAULT_TIMEOUT_IN_SECONDS = 120; + static final int + ADL_FEATURE_CONCURRENT_READ_WITH_READ_AHEAD_BUFFER_SIZE_DEFAULT = + 8 * MB; + + private ADLConfKeys() { + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8f03ef7/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/BufferManager.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/BufferManager.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/BufferManager.java new file mode 100644 index 0000000..350c6e7 --- /dev/null +++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/BufferManager.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.hdfs.web; + +/** + * Responsible for holding buffered data in the process. Hold only 1 and only + * 1 buffer block in the memory. Buffer block + * information is for the given file and the offset from the which the block + * is fetched. Across the webhdfs instances if + * same buffer block has been used then backend trip is avoided. Buffer block + * is certainly important since ADL fetches + * large amount of data (Default is 4MB however can be configured through + * core-site.xml) from the backend. + * Observation is in case of ORC/Avro kind of compressed file, buffer block + * does not avoid few backend calls across + * webhdfs + * instances. + */ +final class BufferManager { + private static final BufferManager BUFFER_MANAGER_INSTANCE = new + BufferManager(); + private static Object lock = new Object(); + private Buffer buffer = null; + private String fileName; + + /** + * Constructor. + */ + private BufferManager() { + } + + public static Object getLock() { + return lock; + } + + public static BufferManager getInstance() { + return BUFFER_MANAGER_INSTANCE; + } + + /** + * Validate if the current buffer block is of given stream. + * + * @param path ADL stream path + * @param offset Stream offset that caller is interested in + * @return True if the buffer block is available otherwise false + */ + boolean hasValidDataForOffset(String path, long offset) { + if (this.fileName == null) { + return false; + } + + if (!this.fileName.equals(path)) { + return false; + } + + if (buffer == null) { + return false; + } + + if ((offset < buffer.offset) || (offset >= (buffer.offset + + buffer.data.length))) { + return false; + } + + return true; + } + + /** + * Clean buffer block. + */ + void clear() { + buffer = null; + } + + /** + * Validate if the current buffer block is of given stream. For now partial + * data available is not supported. + * Data must be available exactly or within the range of offset and size + * passed as parameter. + * + * @param path Stream path + * @param offset Offset of the stream + * @param size Size of the data from the offset of the stream caller + * interested in + * @return True if the data is available from the given offset and of the + * size caller is interested in. + */ + boolean hasData(String path, long offset, int size) { + + if (!hasValidDataForOffset(path, offset)) { + return false; + } + + if ((size + offset) > (buffer.data.length + buffer.offset)) { + return false; + } + return true; + } + + /** + * Return the buffer block from the requested offset. It is caller + * responsibility to check if the buffer block is + * of there interest and offset is valid. + * + * @param data Byte array to be filed from the buffer block + * @param offset Data to be fetched from the offset. + */ + void get(byte[] data, long offset) { + System.arraycopy(buffer.data, (int) (offset - buffer.offset), data, 0, + data.length); + } + + /** + * Create new empty buffer block of the given size. + * + * @param len Size of the buffer block. + * @return Empty byte array. + */ + byte[] getEmpty(int len) { + return new byte[len]; + } + + /** + * This function allows caller to specify new buffer block for the stream + * which is pulled from the backend. + * + * @param data Buffer + * @param path Stream path to which buffer belongs to + * @param offset Stream offset where buffer start with + */ + void add(byte[] data, String path, long offset) { + if (data == null) { + return; + } + + buffer = new Buffer(); + buffer.data = data; + buffer.offset = offset; + this.fileName = path; + } + + /** + * @return Size of the buffer. + */ + int getBufferSize() { + return buffer.data.length; + } + + /** + * @return Stream offset where buffer start with + */ + long getBufferOffset() { + return buffer.offset; + } + + /** + * Buffer container. + */ + static class Buffer { + private byte[] data; + private long offset; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8f03ef7/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/PrivateAzureDataLakeFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/PrivateAzureDataLakeFileSystem.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/PrivateAzureDataLakeFileSystem.java new file mode 100644 index 0000000..89011d2 --- /dev/null +++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/PrivateAzureDataLakeFileSystem.java @@ -0,0 +1,1108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.hdfs.web; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FSInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.web.resources.ADLFlush; +import org.apache.hadoop.hdfs.web.resources.ADLGetOpParam; +import org.apache.hadoop.hdfs.web.resources.ADLPostOpParam; +import org.apache.hadoop.hdfs.web.resources.ADLPutOpParam; +import org.apache.hadoop.hdfs.web.resources.ADLVersionInfo; +import org.apache.hadoop.hdfs.web.resources.AppendADLNoRedirectParam; +import org.apache.hadoop.hdfs.web.resources.BlockSizeParam; +import org.apache.hadoop.hdfs.web.resources.BufferSizeParam; +import org.apache.hadoop.hdfs.web.resources.CreateADLNoRedirectParam; +import org.apache.hadoop.hdfs.web.resources.CreateFlagParam; +import org.apache.hadoop.hdfs.web.resources.CreateParentParam; +import org.apache.hadoop.hdfs.web.resources.GetOpParam; +import org.apache.hadoop.hdfs.web.resources.HttpOpParam; +import org.apache.hadoop.hdfs.web.resources.LeaseParam; +import org.apache.hadoop.hdfs.web.resources.LengthParam; +import org.apache.hadoop.hdfs.web.resources.OffsetParam; +import org.apache.hadoop.hdfs.web.resources.OverwriteParam; +import org.apache.hadoop.hdfs.web.resources.Param; +import org.apache.hadoop.hdfs.web.resources.PermissionParam; +import org.apache.hadoop.hdfs.web.resources.PutOpParam; +import org.apache.hadoop.hdfs.web.resources.ReadADLNoRedirectParam; +import org.apache.hadoop.hdfs.web.resources.ReplicationParam; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.util.VersionInfo; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.SocketException; +import java.net.URI; +import java.net.URL; +import java.util.EnumSet; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +/** + * Extended @see SWebHdfsFileSystem API. This class contains Azure data lake + * specific stability, Reliability and performance improvement. + * <p> + * Motivation behind PrivateAzureDataLakeFileSystem to encapsulate dependent + * implementation on org.apache.hadoop.hdfs.web package to configure query + * parameters, configuration over HTTP request send to backend .. etc. This + * class should be refactored and moved under package org.apache.hadoop.fs + * .adl once the required dependent changes are made into ASF code. + */ +public class PrivateAzureDataLakeFileSystem extends SWebHdfsFileSystem { + + public static final String SCHEME = "adl"; + + // Feature configuration + private boolean featureGetBlockLocationLocallyBundled = true; + private boolean featureConcurrentReadWithReadAhead = true; + private boolean featureRedirectOff = true; + private boolean featureFlushWhenEOF = true; + private boolean overrideOwner = false; + private int maxConcurrentConnection; + private int maxBufferSize; + private String userName; + + /** + * Constructor. + */ + public PrivateAzureDataLakeFileSystem() { + try { + userName = UserGroupInformation.getCurrentUser().getShortUserName(); + } catch (IOException e) { + userName = "hadoop"; + } + } + + @Override + public synchronized void initialize(URI uri, Configuration conf) + throws IOException { + super.initialize(uri, conf); + overrideOwner = getConf() + .getBoolean(ADLConfKeys.ADL_DEBUG_OVERRIDE_LOCAL_USER_AS_OWNER, + ADLConfKeys.ADL_DEBUG_SET_LOCAL_USER_AS_OWNER_DEFAULT); + + featureRedirectOff = getConf() + .getBoolean(ADLConfKeys.ADL_FEATURE_REDIRECT_OFF, + ADLConfKeys.ADL_FEATURE_REDIRECT_OFF_DEFAULT); + + featureGetBlockLocationLocallyBundled = getConf() + .getBoolean(ADLConfKeys.ADL_FEATURE_GET_BLOCK_LOCATION_LOCALLY_BUNDLED, + ADLConfKeys.ADL_FEATURE_GET_BLOCK_LOCATION_LOCALLY_BUNDLED_DEFAULT); + + featureConcurrentReadWithReadAhead = getConf(). + getBoolean(ADLConfKeys.ADL_FEATURE_CONCURRENT_READ_WITH_READ_AHEAD, + ADLConfKeys.ADL_FEATURE_CONCURRENT_READ_WITH_READ_AHEAD_DEFAULT); + + maxBufferSize = getConf().getInt( + ADLConfKeys.ADL_FEATURE_CONCURRENT_READ_WITH_READ_AHEAD_BUFFER_SIZE, + ADLConfKeys + .ADL_FEATURE_CONCURRENT_READ_WITH_READ_AHEAD_BUFFER_SIZE_DEFAULT); + + maxConcurrentConnection = getConf().getInt( + ADLConfKeys.ADL_FEATURE_CONCURRENT_READ_AHEAD_MAX_CONCURRENT_CONN, + ADLConfKeys + .ADL_FEATURE_CONCURRENT_READ_AHEAD_MAX_CONCURRENT_CONN_DEFAULT); + } + + @VisibleForTesting + protected boolean isFeatureGetBlockLocationLocallyBundled() { + return featureGetBlockLocationLocallyBundled; + } + + @VisibleForTesting + protected boolean isFeatureConcurrentReadWithReadAhead() { + return featureConcurrentReadWithReadAhead; + } + + @VisibleForTesting + protected boolean isFeatureRedirectOff() { + return featureRedirectOff; + } + + @VisibleForTesting + protected boolean isOverrideOwnerFeatureOn() { + return overrideOwner; + } + + @VisibleForTesting + protected int getMaxBufferSize() { + return maxBufferSize; + } + + @VisibleForTesting + protected int getMaxConcurrentConnection() { + return maxConcurrentConnection; + } + + @Override + public String getScheme() { + return SCHEME; + } + + /** + * Constructing home directory locally is fine as long as Hadoop + * local user name and ADL user name relationship story is not fully baked + * yet. + * + * @return Hadoop local user home directory. + */ + @Override + public final Path getHomeDirectory() { + try { + return makeQualified(new Path( + "/user/" + UserGroupInformation.getCurrentUser().getShortUserName())); + } catch (IOException e) { + } + + return new Path("/user/" + userName); + } + + /** + * Azure data lake does not support user configuration for data replication + * hence not leaving system to query on + * azure data lake. + * + * Stub implementation + * + * @param p Not honoured + * @param replication Not honoured + * @return True hard coded since ADL file system does not support + * replication configuration + * @throws IOException No exception would not thrown in this case however + * aligning with parent api definition. + */ + @Override + public final boolean setReplication(final Path p, final short replication) + throws IOException { + return true; + } + + /** + * @param f File/Folder path + * @return FileStatus instance containing metadata information of f + * @throws IOException For any system error + */ + @Override + public FileStatus getFileStatus(Path f) throws IOException { + statistics.incrementReadOps(1); + FileStatus status = super.getFileStatus(f); + + if (overrideOwner) { + FileStatus proxiedStatus = new FileStatus(status.getLen(), + status.isDirectory(), status.getReplication(), status.getBlockSize(), + status.getModificationTime(), status.getAccessTime(), + status.getPermission(), userName, "hdfs", status.getPath()); + return proxiedStatus; + } else { + return status; + } + } + + /** + * Create call semantic is handled differently in case of ADL. Create + * semantics is translated to Create/Append + * semantics. + * 1. No dedicated connection to server. + * 2. Buffering is locally done, Once buffer is full or flush is invoked on + * the by the caller. All the pending + * data is pushed to ADL as APPEND operation code. + * 3. On close - Additional call is send to server to close the stream, and + * release lock from the stream. + * + * Necessity of Create/Append semantics is + * 1. ADL backend server does not allow idle connection for longer duration + * . In case of slow writer scenario, + * observed connection timeout/Connection reset causing occasional job + * failures. + * 2. Performance boost to jobs which are slow writer, avoided network latency + * 3. ADL equally better performing with multiple of 4MB chunk as append + * calls. + * + * @param f File path + * @param permission Access permission for the newly created file + * @param overwrite Remove existing file and recreate new one if true + * otherwise throw error if file exist + * @param bufferSize Buffer size, ADL backend does not honour + * @param replication Replication count, ADL backend does not honour + * @param blockSize Block size, ADL backend does not honour + * @param progress Progress indicator + * @return FSDataOutputStream OutputStream on which application can push + * stream of bytes + * @throws IOException when system error, internal server error or user error + */ + @Override + public FSDataOutputStream create(final Path f, final FsPermission permission, + final boolean overwrite, final int bufferSize, final short replication, + final long blockSize, final Progressable progress) throws IOException { + statistics.incrementWriteOps(1); + + return new FSDataOutputStream(new BatchAppendOutputStream(f, bufferSize, + new PermissionParam(applyUMask(permission)), + new OverwriteParam(overwrite), new BufferSizeParam(bufferSize), + new ReplicationParam(replication), new BlockSizeParam(blockSize), + new ADLVersionInfo(VersionInfo.getVersion())), statistics) { + }; + } + + @Override + public FSDataOutputStream createNonRecursive(final Path f, + final FsPermission permission, final EnumSet<CreateFlag> flag, + final int bufferSize, final short replication, final long blockSize, + final Progressable progress) throws IOException { + statistics.incrementWriteOps(1); + + String leaseId = java.util.UUID.randomUUID().toString(); + return new FSDataOutputStream(new BatchAppendOutputStream(f, bufferSize, + new PermissionParam(applyUMask(permission)), new CreateFlagParam(flag), + new CreateParentParam(false), new BufferSizeParam(bufferSize), + new ReplicationParam(replication), new LeaseParam(leaseId), + new BlockSizeParam(blockSize), + new ADLVersionInfo(VersionInfo.getVersion())), statistics) { + }; + } + + /** + * Since defined as private in parent class, redefined to pass through + * Create api implementation. + * + * @param permission + * @return FsPermission list + */ + private FsPermission applyUMask(FsPermission permission) { + FsPermission fsPermission = permission; + if (fsPermission == null) { + fsPermission = FsPermission.getDefault(); + } + return fsPermission.applyUMask(FsPermission.getUMask(getConf())); + } + + /** + * Open call semantic is handled differently in case of ADL. Instead of + * network stream is returned to the user, + * Overridden FsInputStream is returned. + * + * 1. No dedicated connection to server. + * 2. Process level concurrent read ahead Buffering is done, This allows + * data to be available for caller quickly. + * 3. Number of byte to read ahead is configurable. + * + * Advantage of Process level concurrent read ahead Buffering semantics is + * 1. ADL backend server does not allow idle connection for longer duration + * . In case of slow reader scenario, + * observed connection timeout/Connection reset causing occasional job + * failures. + * 2. Performance boost to jobs which are slow reader, avoided network latency + * 3. Compressed format support like ORC, and large data files gains the + * most out of this implementation. + * + * Read ahead feature is configurable. + * + * @param f File path + * @param buffersize Buffer size + * @return FSDataInputStream InputStream on which application can read + * stream of bytes + * @throws IOException when system error, internal server error or user error + */ + @Override + public FSDataInputStream open(final Path f, final int buffersize) + throws IOException { + statistics.incrementReadOps(1); + + final HttpOpParam.Op op = GetOpParam.Op.OPEN; + // use a runner so the open can recover from an invalid token + FsPathConnectionRunner runner = null; + + if (featureConcurrentReadWithReadAhead) { + URL url = this.toUrl(op, f, new BufferSizeParam(buffersize), + new ReadADLNoRedirectParam(true), + new ADLVersionInfo(VersionInfo.getVersion())); + + BatchByteArrayInputStream bb = new BatchByteArrayInputStream(url, f, + maxBufferSize, maxConcurrentConnection); + + FSDataInputStream fin = new FSDataInputStream(bb); + return fin; + } else { + if (featureRedirectOff) { + runner = new FsPathConnectionRunner(ADLGetOpParam.Op.OPEN, f, + new BufferSizeParam(buffersize), new ReadADLNoRedirectParam(true), + new ADLVersionInfo(VersionInfo.getVersion())); + } else { + runner = new FsPathConnectionRunner(op, f, + new BufferSizeParam(buffersize)); + } + + return new FSDataInputStream( + new OffsetUrlInputStream(new UnresolvedUrlOpener(runner), + new OffsetUrlOpener(null))); + } + } + + /** + * @param f File/Folder path + * @return FileStatus array list + * @throws IOException For system error + */ + @Override + public FileStatus[] listStatus(final Path f) throws IOException { + FileStatus[] fileStatuses = super.listStatus(f); + for (int i = 0; i < fileStatuses.length; i++) { + if (overrideOwner) { + fileStatuses[i] = new FileStatus(fileStatuses[i].getLen(), + fileStatuses[i].isDirectory(), fileStatuses[i].getReplication(), + fileStatuses[i].getBlockSize(), + fileStatuses[i].getModificationTime(), + fileStatuses[i].getAccessTime(), fileStatuses[i].getPermission(), + userName, "hdfs", fileStatuses[i].getPath()); + } + } + return fileStatuses; + } + + @Override + public BlockLocation[] getFileBlockLocations(final FileStatus status, + final long offset, final long length) throws IOException { + if (status == null) { + return null; + } + + if (featureGetBlockLocationLocallyBundled) { + if ((offset < 0) || (length < 0)) { + throw new IllegalArgumentException("Invalid start or len parameter"); + } + + if (status.getLen() < offset) { + return new BlockLocation[0]; + } + + final String[] name = {"localhost"}; + final String[] host = {"localhost"}; + long blockSize = ADLConfKeys.DEFAULT_EXTENT_SIZE; // Block size must be + // non zero + int numberOfLocations = + (int) (length / blockSize) + ((length % blockSize == 0) ? 0 : 1); + BlockLocation[] locations = new BlockLocation[numberOfLocations]; + for (int i = 0; i < locations.length; i++) { + long currentOffset = offset + (i * blockSize); + long currentLength = Math + .min(blockSize, offset + length - currentOffset); + locations[i] = new BlockLocation(name, host, currentOffset, + currentLength); + } + + return locations; + } else { + return getFileBlockLocations(status.getPath(), offset, length); + } + } + + @Override + public BlockLocation[] getFileBlockLocations(final Path p, final long offset, + final long length) throws IOException { + statistics.incrementReadOps(1); + + if (featureGetBlockLocationLocallyBundled) { + FileStatus fileStatus = getFileStatus(p); + return getFileBlockLocations(fileStatus, offset, length); + } else { + return super.getFileBlockLocations(p, offset, length); + } + } + + enum StreamState { + Initial, + DataCachedInLocalBuffer, + StreamEnd + } + + class BatchAppendOutputStream extends OutputStream { + private Path fsPath; + private Param<?, ?>[] parameters; + private byte[] data = null; + private int offset = 0; + private long length = 0; + private boolean eof = false; + private boolean hadError = false; + private byte[] dataBuffers = null; + private int bufSize = 0; + private boolean streamClosed = false; + + public BatchAppendOutputStream(Path path, int bufferSize, + Param<?, ?>... param) throws IOException { + if (bufferSize < (ADLConfKeys.DEFAULT_BLOCK_SIZE)) { + bufSize = ADLConfKeys.DEFAULT_BLOCK_SIZE; + } else { + bufSize = bufferSize; + } + + this.fsPath = path; + this.parameters = param; + this.data = getBuffer(); + FSDataOutputStream createStream = null; + try { + if (featureRedirectOff) { + CreateADLNoRedirectParam skipRedirect = new CreateADLNoRedirectParam( + true); + Param<?, ?>[] tmpParam = featureFlushWhenEOF ? + new Param<?, ?>[param.length + 2] : + new Param<?, ?>[param.length + 1]; + System.arraycopy(param, 0, tmpParam, 0, param.length); + tmpParam[param.length] = skipRedirect; + if (featureFlushWhenEOF) { + tmpParam[param.length + 1] = new ADLFlush(false); + } + createStream = new FsPathOutputStreamRunner(ADLPutOpParam.Op.CREATE, + fsPath, 1, tmpParam).run(); + } else { + createStream = new FsPathOutputStreamRunner(PutOpParam.Op.CREATE, + fsPath, 1, param).run(); + } + } finally { + if (createStream != null) { + createStream.close(); + } + } + } + + @Override + public final synchronized void write(int b) throws IOException { + if (streamClosed) { + throw new IOException(fsPath + " stream object is closed."); + } + + if (offset == (data.length)) { + flush(); + } + + data[offset] = (byte) b; + offset++; + + // Statistics will get incremented again as part of the batch updates, + // decrement here to avoid double value + if (statistics != null) { + statistics.incrementBytesWritten(-1); + } + } + + @Override + public final synchronized void write(byte[] buf, int off, int len) + throws IOException { + if (streamClosed) { + throw new IOException(fsPath + " stream object is closed."); + } + + int bytesToWrite = len; + int localOff = off; + int localLen = len; + if (localLen >= data.length) { + // Flush data that is already in our internal buffer + flush(); + + // Keep committing data until we have less than our internal buffers + // length left + do { + try { + commit(buf, localOff, data.length, eof); + } catch (IOException e) { + hadError = true; + throw e; + } + localOff += data.length; + localLen -= data.length; + } while (localLen >= data.length); + } + + // At this point, we have less than data.length left to copy from users + // buffer + if (offset + localLen >= data.length) { + // Users buffer has enough data left to fill our internal buffer + int bytesToCopy = data.length - offset; + System.arraycopy(buf, localOff, data, offset, bytesToCopy); + offset += bytesToCopy; + + // Flush our internal buffer + flush(); + localOff += bytesToCopy; + localLen -= bytesToCopy; + } + + if (localLen > 0) { + // Simply copy the remainder from the users buffer into our internal + // buffer + System.arraycopy(buf, localOff, data, offset, localLen); + offset += localLen; + } + + // Statistics will get incremented again as part of the batch updates, + // decrement here to avoid double value + if (statistics != null) { + statistics.incrementBytesWritten(-bytesToWrite); + } + } + + @Override + public final synchronized void flush() throws IOException { + if (streamClosed) { + throw new IOException(fsPath + " stream object is closed."); + } + + if (offset > 0) { + try { + commit(data, 0, offset, eof); + } catch (IOException e) { + hadError = true; + throw e; + } + } + + offset = 0; + } + + @Override + public final synchronized void close() throws IOException { + // Stream is closed earlier, return quietly. + if(streamClosed) { + return; + } + + if (featureRedirectOff) { + eof = true; + } + + boolean flushedSomething = false; + if (hadError) { + // No point proceeding further since the error has occurred and + // stream would be required to upload again. + streamClosed = true; + return; + } else { + flushedSomething = offset > 0; + try { + flush(); + } finally { + streamClosed = true; + } + } + + if (featureRedirectOff) { + // If we didn't flush anything from our internal buffer, we have to + // call the service again + // with an empty payload and flush=true in the url + if (!flushedSomething) { + try { + commit(null, 0, ADLConfKeys.KB, true); + } finally { + streamClosed = true; + } + } + } + } + + private void commit(byte[] buffer, int off, int len, boolean endOfFile) + throws IOException { + OutputStream out = null; + try { + if (featureRedirectOff) { + AppendADLNoRedirectParam skipRedirect = new AppendADLNoRedirectParam( + true); + Param<?, ?>[] tmpParam = featureFlushWhenEOF ? + new Param<?, ?>[parameters.length + 3] : + new Param<?, ?>[parameters.length + 1]; + System.arraycopy(parameters, 0, tmpParam, 0, parameters.length); + tmpParam[parameters.length] = skipRedirect; + if (featureFlushWhenEOF) { + tmpParam[parameters.length + 1] = new ADLFlush(endOfFile); + tmpParam[parameters.length + 2] = new OffsetParam(length); + } + + out = new FsPathOutputStreamRunner(ADLPostOpParam.Op.APPEND, fsPath, + len, tmpParam).run(); + } else { + out = new FsPathOutputStreamRunner(ADLPostOpParam.Op.APPEND, fsPath, + len, parameters).run(); + } + + if (buffer != null) { + out.write(buffer, off, len); + length += len; + } + } finally { + if (out != null) { + out.close(); + } + } + } + + private byte[] getBuffer() { + // Switch between the first and second buffer + dataBuffers = new byte[bufSize]; + return dataBuffers; + } + } + + /** + * Read data from backend in chunks instead of persistent connection. This + * is to avoid slow reader causing socket + * timeout. + */ + protected class BatchByteArrayInputStream extends FSInputStream { + + private static final int SIZE4MB = 4 * 1024 * 1024; + private final URL runner; + private byte[] data = null; + private long validDataHoldingSize = 0; + private int bufferOffset = 0; + private long currentFileOffset = 0; + private long nextFileOffset = 0; + private long fileSize = 0; + private StreamState state = StreamState.Initial; + private int maxBufferSize; + private int maxConcurrentConnection; + private Path fsPath; + private boolean streamIsClosed; + private Future[] subtasks = null; + + BatchByteArrayInputStream(URL url, Path p, int bufferSize, + int concurrentConnection) throws IOException { + this.runner = url; + fsPath = p; + FileStatus fStatus = getFileStatus(fsPath); + if (!fStatus.isFile()) { + throw new IOException("Cannot open the directory " + p + " for " + + "reading"); + } + fileSize = fStatus.getLen(); + this.maxBufferSize = bufferSize; + this.maxConcurrentConnection = concurrentConnection; + this.streamIsClosed = false; + } + + @Override + public synchronized final int read(long position, byte[] buffer, int offset, + int length) throws IOException { + if (streamIsClosed) { + throw new IOException("Stream already closed"); + } + long oldPos = this.getPos(); + + int nread1; + try { + this.seek(position); + nread1 = this.read(buffer, offset, length); + } finally { + this.seek(oldPos); + } + + return nread1; + } + + @Override + public synchronized final int read() throws IOException { + if (streamIsClosed) { + throw new IOException("Stream already closed"); + } + int status = doBufferAvailabilityCheck(); + if (status == -1) { + return status; + } + int ch = data[bufferOffset++] & (0xff); + if (statistics != null) { + statistics.incrementBytesRead(1); + } + return ch; + } + + @Override + public synchronized final void readFully(long position, byte[] buffer, + int offset, int length) throws IOException { + if (streamIsClosed) { + throw new IOException("Stream already closed"); + } + + super.readFully(position, buffer, offset, length); + if (statistics != null) { + statistics.incrementBytesRead(length); + } + } + + @Override + public synchronized final int read(byte[] b, int off, int len) + throws IOException { + if (b == null) { + throw new IllegalArgumentException(); + } else if (off < 0 || len < 0 || len > b.length - off) { + throw new IndexOutOfBoundsException(); + } else if (len == 0) { + return 0; + } + + if (streamIsClosed) { + throw new IOException("Stream already closed"); + } + int status = doBufferAvailabilityCheck(); + if (status == -1) { + return status; + } + + int byteRead = 0; + long availableBytes = validDataHoldingSize - off; + long requestedBytes = bufferOffset + len - off; + if (requestedBytes <= availableBytes) { + System.arraycopy(data, bufferOffset, b, off, len); + bufferOffset += len; + byteRead = len; + } else { + byteRead = super.read(b, off, len); + } + + if (statistics != null) { + statistics.incrementBytesRead(byteRead); + } + + return byteRead; + } + + private int doBufferAvailabilityCheck() throws IOException { + if (state == StreamState.Initial) { + validDataHoldingSize = fill(nextFileOffset); + } + + long dataReloadSize = 0; + switch ((int) validDataHoldingSize) { + case -1: + state = StreamState.StreamEnd; + return -1; + case 0: + dataReloadSize = fill(nextFileOffset); + if (dataReloadSize <= 0) { + state = StreamState.StreamEnd; + return (int) dataReloadSize; + } else { + validDataHoldingSize = dataReloadSize; + } + break; + default: + break; + } + + if (bufferOffset >= validDataHoldingSize) { + dataReloadSize = fill(nextFileOffset); + } + + if (bufferOffset >= ((dataReloadSize == 0) ? + validDataHoldingSize : + dataReloadSize)) { + state = StreamState.StreamEnd; + return -1; + } + + validDataHoldingSize = ((dataReloadSize == 0) ? + validDataHoldingSize : + dataReloadSize); + state = StreamState.DataCachedInLocalBuffer; + return 0; + } + + private long fill(final long off) throws IOException { + if (state == StreamState.StreamEnd) { + return -1; + } + + if (fileSize <= off) { + state = StreamState.StreamEnd; + return -1; + } + int len = maxBufferSize; + long fileOffset = 0; + boolean isEntireFileCached = true; + if ((fileSize <= maxBufferSize)) { + len = (int) fileSize; + currentFileOffset = 0; + nextFileOffset = 0; + } else { + if (len > (fileSize - off)) { + len = (int) (fileSize - off); + } + + synchronized (BufferManager.getLock()) { + if (BufferManager.getInstance() + .hasValidDataForOffset(fsPath.toString(), off)) { + len = (int) ( + BufferManager.getInstance().getBufferOffset() + BufferManager + .getInstance().getBufferSize() - (int) off); + } + } + + if (len <= 0) { + len = maxBufferSize; + } + fileOffset = off; + isEntireFileCached = false; + } + + data = null; + BufferManager bm = BufferManager.getInstance(); + data = bm.getEmpty(len); + boolean fetchDataOverNetwork = false; + synchronized (BufferManager.getLock()) { + if (bm.hasData(fsPath.toString(), fileOffset, len)) { + try { + bm.get(data, fileOffset); + validDataHoldingSize = data.length; + currentFileOffset = fileOffset; + } catch (ArrayIndexOutOfBoundsException e) { + fetchDataOverNetwork = true; + } + } else { + fetchDataOverNetwork = true; + } + } + + if (fetchDataOverNetwork) { + int splitSize = getSplitSize(len); + try { + validDataHoldingSize = fillDataConcurrently(data, len, fileOffset, + splitSize); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted filling buffer", e); + } + + synchronized (BufferManager.getLock()) { + bm.add(data, fsPath.toString(), fileOffset); + } + currentFileOffset = nextFileOffset; + } + + nextFileOffset += validDataHoldingSize; + state = StreamState.DataCachedInLocalBuffer; + bufferOffset = isEntireFileCached ? (int) off : 0; + return validDataHoldingSize; + } + + int getSplitSize(int size) { + if (size <= SIZE4MB) { + return 1; + } + + // Not practical + if (size > maxBufferSize) { + size = maxBufferSize; + } + + int equalBufferSplit = Math.max(Math.round(size / SIZE4MB), 1); + int splitSize = Math.min(equalBufferSplit, maxConcurrentConnection); + return splitSize; + } + + @Override + public synchronized final void seek(long pos) throws IOException { + if (pos == -1) { + throw new IOException("Bad offset, cannot seek to " + pos); + } + + BufferManager bm = BufferManager.getInstance(); + synchronized (BufferManager.getLock()) { + if (bm.hasValidDataForOffset(fsPath.toString(), pos)) { + state = StreamState.DataCachedInLocalBuffer; + } else if (pos >= 0) { + state = StreamState.Initial; + } + } + + long availableBytes = (currentFileOffset + validDataHoldingSize); + + // Check if this position falls under buffered data + if (pos < currentFileOffset || availableBytes <= 0) { + validDataHoldingSize = 0; + currentFileOffset = pos; + nextFileOffset = pos; + bufferOffset = 0; + return; + } + + if (pos < availableBytes && pos >= currentFileOffset) { + state = StreamState.DataCachedInLocalBuffer; + bufferOffset = (int) (pos - currentFileOffset); + } else { + validDataHoldingSize = 0; + currentFileOffset = pos; + nextFileOffset = pos; + bufferOffset = 0; + } + } + + @Override + public synchronized final long getPos() throws IOException { + if (streamIsClosed) { + throw new IOException("Stream already closed"); + } + return bufferOffset + currentFileOffset; + } + + @Override + public synchronized final int available() throws IOException { + if (streamIsClosed) { + throw new IOException("Stream already closed"); + } + return Integer.MAX_VALUE; + } + + @Override + public final boolean seekToNewSource(long targetPos) throws IOException { + return false; + } + + @SuppressWarnings("unchecked") + private int fillDataConcurrently(byte[] byteArray, int length, + long globalOffset, int splitSize) + throws IOException, InterruptedException { + ExecutorService executor = Executors.newFixedThreadPool(splitSize); + subtasks = new Future[splitSize]; + for (int i = 0; i < splitSize; i++) { + int offset = i * (length / splitSize); + int splitLength = (splitSize == (i + 1)) ? + (length / splitSize) + (length % splitSize) : + (length / splitSize); + subtasks[i] = executor.submit( + new BackgroundReadThread(byteArray, offset, splitLength, + globalOffset + offset)); + } + + executor.shutdown(); + // wait until all tasks are finished + executor.awaitTermination(ADLConfKeys.DEFAULT_TIMEOUT_IN_SECONDS, + TimeUnit.SECONDS); + + int totalBytePainted = 0; + for (int i = 0; i < splitSize; ++i) { + try { + totalBytePainted += (Integer) subtasks[i].get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException(e.getCause()); + } catch (ExecutionException e) { + Thread.currentThread().interrupt(); + throw new IOException(e.getCause()); + } + } + + if (totalBytePainted != length) { + throw new IOException("Expected " + length + " bytes, Got " + + totalBytePainted + " bytes"); + } + + return totalBytePainted; + } + + @Override + public synchronized final void close() throws IOException { + synchronized (BufferManager.getLock()) { + BufferManager.getInstance().clear(); + } + //need to cleanup the above code the stream and connection close doesn't + // happen here + //flag set to mark close happened, cannot use the stream once closed + streamIsClosed = true; + } + + /** + * Reads data from the ADL backend from the specified global offset and + * given + * length. Read data from ADL backend is copied to buffer array from the + * offset value specified. + * + * @param buffer Store read data from ADL backend in the buffer. + * @param offset Store read data from ADL backend in the buffer + * from the + * offset. + * @param length Size of the data read from the ADL backend. + * @param globalOffset Read data from file offset. + * @return Number of bytes read from the ADL backend + * @throws IOException For any intermittent server issues or internal + * failures. + */ + private int fillUpData(byte[] buffer, int offset, int length, + long globalOffset) throws IOException { + int totalBytesRead = 0; + final URL offsetUrl = new URL( + runner + "&" + new OffsetParam(String.valueOf(globalOffset)) + "&" + + new LengthParam(String.valueOf(length))); + HttpURLConnection conn = new URLRunner(GetOpParam.Op.OPEN, offsetUrl, + true).run(); + InputStream in = conn.getInputStream(); + try { + int bytesRead = 0; + while ((bytesRead = in.read(buffer, (int) offset + totalBytesRead, + (int) (length - totalBytesRead))) > 0) { + totalBytesRead += bytesRead; + } + + // InputStream must be fully consumed to enable http keep-alive + if (bytesRead == 0) { + // Looking for EOF marker byte needs to be read. + if (in.read() != -1) { + throw new SocketException( + "Server returned more than requested data."); + } + } + } finally { + in.close(); + conn.disconnect(); + } + + return totalBytesRead; + } + + private class BackgroundReadThread implements Callable { + + private final byte[] data; + private int offset; + private int length; + private long globalOffset; + + BackgroundReadThread(byte[] buffer, int off, int size, long position) { + this.data = buffer; + this.offset = off; + this.length = size; + this.globalOffset = position; + } + + public Object call() throws IOException { + return fillUpData(data, offset, length, globalOffset); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8f03ef7/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/oauth2/PrivateCachedRefreshTokenBasedAccessTokenProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/oauth2/PrivateCachedRefreshTokenBasedAccessTokenProvider.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/oauth2/PrivateCachedRefreshTokenBasedAccessTokenProvider.java new file mode 100644 index 0000000..d7dce25 --- /dev/null +++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/oauth2/PrivateCachedRefreshTokenBasedAccessTokenProvider.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.hdfs.web.oauth2; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +/** + * Exposing AccessTokenProvider publicly to extend in com.microsoft.azure + * .datalake package. Extended version to cache + * token for the process to gain performance gain. + */ +@Private +@Unstable +public abstract class PrivateCachedRefreshTokenBasedAccessTokenProvider + extends AccessTokenProvider { + + // visibility workaround + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8f03ef7/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/oauth2/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/oauth2/package-info.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/oauth2/package-info.java new file mode 100644 index 0000000..7a9dffa --- /dev/null +++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/oauth2/package-info.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/** + * A distributed implementation of {@link + * org.apache.hadoop.hdfs.web.oauth2} for oauth2 token management support. + */ +package org.apache.hadoop.hdfs.web.oauth2; http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8f03ef7/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/package-info.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/package-info.java new file mode 100644 index 0000000..1cc8273 --- /dev/null +++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/package-info.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/** + * A distributed implementation of {@link org.apache.hadoop.hdfs.web} for + * reading and writing files on Azure data lake file system. This + * implementation is derivation from the webhdfs specification. + */ +package org.apache.hadoop.hdfs.web; http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8f03ef7/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/ADLFlush.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/ADLFlush.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/ADLFlush.java new file mode 100644 index 0000000..b76aaaa --- /dev/null +++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/ADLFlush.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.hdfs.web.resources; + +/** + * Query parameter to notify backend server that the all the data has been + * pushed to over the stream. + * + * Used in operation code Create and Append. + */ +public class ADLFlush extends BooleanParam { + /** + * Parameter name. + */ + public static final String NAME = "flush"; + + private static final Domain DOMAIN = new Domain(NAME); + + /** + * Constructor. + * + * @param value the parameter value. + */ + public ADLFlush(final Boolean value) { + super(DOMAIN, value); + } + + @Override + public final String getName() { + return NAME; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8f03ef7/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/ADLGetOpParam.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/ADLGetOpParam.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/ADLGetOpParam.java new file mode 100644 index 0000000..6b3708f --- /dev/null +++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/ADLGetOpParam.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.hdfs.web.resources; + +import java.net.HttpURLConnection; + +/** + * Extended Webhdfs GetOpParam to avoid redirect operation for azure data + * lake storage. + */ +public class ADLGetOpParam extends HttpOpParam<ADLGetOpParam.Op> { + private static final Domain<Op> DOMAIN = new Domain<Op>(NAME, Op.class); + + /** + * Constructor. + * + * @param str a string representation of the parameter value. + */ + public ADLGetOpParam(final String str) { + super(DOMAIN, DOMAIN.parse(str)); + } + + @Override + public final String getName() { + return NAME; + } + + /** + * Get operations. + */ + public static enum Op implements HttpOpParam.Op { + OPEN(false, HttpURLConnection.HTTP_OK); + + private final boolean redirect; + private final int expectedHttpResponseCode; + private final boolean requireAuth; + + Op(final boolean doRedirect, final int expectHttpResponseCode) { + this(doRedirect, expectHttpResponseCode, false); + } + + Op(final boolean doRedirect, final int expectHttpResponseCode, + final boolean doRequireAuth) { + this.redirect = doRedirect; + this.expectedHttpResponseCode = expectHttpResponseCode; + this.requireAuth = doRequireAuth; + } + + @Override + public HttpOpParam.Type getType() { + return HttpOpParam.Type.GET; + } + + @Override + public boolean getRequireAuth() { + return requireAuth; + } + + @Override + public boolean getDoOutput() { + return false; + } + + @Override + public boolean getRedirect() { + return redirect; + } + + @Override + public int getExpectedHttpResponseCode() { + return expectedHttpResponseCode; + } + + @Override + public String toQueryString() { + return NAME + "=" + this; + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org