This is an automated email from the ASF dual-hosted git repository. ctubbsii pushed a commit to branch 2.0 in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit ecd98de9ff9a1a2a46849a6c3f26ba7468cdbac6 Author: Jeffrey L. Zeiberg <jeffrey.zeib...@asrcfederal.com> AuthorDate: Wed Apr 10 07:46:24 2019 -0400 Fix #1052 Correct distributed cache usage (#1112) * Correct distributed cache usage for Tokenfile and Property and RangePartition key cut files. --- .../mapreduce/lib/partition/RangePartitioner.java | 57 +++++++++++++--------- .../clientImpl/mapreduce/lib/ConfiguratorBase.java | 22 +++++---- .../mapreduce/lib/DistributedCacheHelper.java | 1 - .../mapreduce/partition/RangePartitioner.java | 57 +++++++++++++--------- .../hadoopImpl/mapreduce/lib/ConfiguratorBase.java | 24 ++++++--- .../accumulo/test/mapreduce/TokenFileIT.java | 30 +++++++++++- 6 files changed, 129 insertions(+), 62 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/partition/RangePartitioner.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/partition/RangePartitioner.java index 6b32130..8461ff3 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/partition/RangePartitioner.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/partition/RangePartitioner.java @@ -16,21 +16,22 @@ */ package org.apache.accumulo.core.client.mapreduce.lib.partition; -import static java.nio.charset.StandardCharsets.UTF_8; - -import java.io.BufferedReader; -import java.io.FileInputStream; +import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; -import java.io.InputStreamReader; import java.net.URI; +import java.net.URISyntaxException; import java.util.Arrays; import java.util.Base64; import java.util.Scanner; import java.util.TreeSet; +import javax.imageio.IIOException; + import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; @@ -90,26 +91,31 @@ public class RangePartitioner extends Partitioner<Text,Writable> implements Conf justification = "path provided by distributed cache framework, not user input") private synchronized Text[] getCutPoints() throws IOException { if (cutPointArray == null) { + Path path; String cutFileName = conf.get(CUTFILE_KEY); - Path[] cf = Job.getInstance().getLocalCacheFiles(); - - if (cf != null) { - for (Path path : cf) { - if (path.toUri().getPath() - .endsWith(cutFileName.substring(cutFileName.lastIndexOf('/')))) { - TreeSet<Text> cutPoints = new TreeSet<>(); - try (Scanner in = new Scanner(new BufferedReader( - new InputStreamReader(new FileInputStream(path.toString()), UTF_8)))) { - while (in.hasNextLine()) - cutPoints.add(new Text(Base64.getDecoder().decode(in.nextLine()))); - } - cutPointArray = cutPoints.toArray(new Text[cutPoints.size()]); - break; - } + File tempFile = new File(CUTFILE_KEY); + if (tempFile.exists()) { + path = new Path(CUTFILE_KEY); + } else { + path = new Path(cutFileName); + } + + if (path == null) + throw new FileNotFoundException("Cut point file not found in distributed cache"); + + TreeSet<Text> cutPoints = new TreeSet<>(); + FileSystem fs = FileSystem.get(conf); + FSDataInputStream inputStream = fs.open(path); + try (Scanner in = new Scanner(inputStream)) { + while (in.hasNextLine()) { + cutPoints.add(new Text(Base64.getDecoder().decode(in.nextLine()))); } } + + cutPointArray = cutPoints.toArray(new Text[cutPoints.size()]); + if (cutPointArray == null) - throw new FileNotFoundException(cutFileName + " not found in distributed cache"); + throw new IIOException("Cutpoint array not properly created from file" + path.getName()); } return cutPointArray; } @@ -129,7 +135,14 @@ public class RangePartitioner extends Partitioner<Text,Writable> implements Conf * points that represent ranges for partitioning */ public static void setSplitFile(Job job, String file) { - URI uri = new Path(file).toUri(); + URI uri; + try { + uri = new URI(file + "#" + CUTFILE_KEY); + } catch (URISyntaxException e) { + throw new IllegalStateException( + "Unable to add split file \"" + CUTFILE_KEY + "\" to distributed cache."); + } + job.addCacheFile(uri); job.getConfiguration().set(CUTFILE_KEY, uri.getPath()); } diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/ConfiguratorBase.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/ConfiguratorBase.java index acced30..bdbbdb0 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/ConfiguratorBase.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/ConfiguratorBase.java @@ -21,6 +21,7 @@ import static java.util.Objects.requireNonNull; import java.io.ByteArrayInputStream; import java.io.DataInputStream; +import java.io.File; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; @@ -62,6 +63,8 @@ public class ConfiguratorBase { IS_CONFIGURED, PRINCIPAL, TOKEN } + public static final String cachedFileName = "tokenfile"; + public enum TokenSource { FILE, INLINE, JOB; @@ -189,7 +192,7 @@ public class ConfiguratorBase { checkArgument(tokenFile != null, "tokenFile is null"); try { - DistributedCacheHelper.addCacheFile(new URI(tokenFile), conf); + DistributedCacheHelper.addCacheFile(new URI(tokenFile + "#" + cachedFileName), conf); } catch (URISyntaxException e) { throw new IllegalStateException( "Unable to add tokenFile \"" + tokenFile + "\" to distributed cache."); @@ -284,16 +287,17 @@ public class ConfiguratorBase { String tokenFile) { FSDataInputStream in = null; try { - URI[] uris = DistributedCacheHelper.getCacheFiles(conf); - Path path = null; - for (URI u : uris) { - if (u.toString().equals(tokenFile)) { - path = new Path(u); - } + Path path; + // See if the "tokenfile" symlink was created and try to open the file it points to by it. + File tempFile = new File(ConfiguratorBase.cachedFileName); + if (tempFile.exists()) { + path = new Path(ConfiguratorBase.cachedFileName); + } else { + path = new Path(tokenFile); } if (path == null) { - throw new IllegalArgumentException( - "Couldn't find password file called \"" + tokenFile + "\" in cache."); + throw new IllegalArgumentException("Couldn't find password file called \"" + tokenFile + + "\" in the distributed cache or the specified path in the distributed filesystem."); } FileSystem fs = FileSystem.get(conf); in = fs.open(path); diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/DistributedCacheHelper.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/DistributedCacheHelper.java index ead3c2a..49ddc74 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/DistributedCacheHelper.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/mapreduce/lib/DistributedCacheHelper.java @@ -41,5 +41,4 @@ public class DistributedCacheHelper { public static URI[] getCacheFiles(Configuration conf) throws IOException { return org.apache.hadoop.filecache.DistributedCache.getCacheFiles(conf); } - } diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/partition/RangePartitioner.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/partition/RangePartitioner.java index 7304904..7e18f5c 100644 --- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/partition/RangePartitioner.java +++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoop/mapreduce/partition/RangePartitioner.java @@ -16,21 +16,22 @@ */ package org.apache.accumulo.hadoop.mapreduce.partition; -import static java.nio.charset.StandardCharsets.UTF_8; - -import java.io.BufferedReader; -import java.io.FileInputStream; +import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; -import java.io.InputStreamReader; import java.net.URI; +import java.net.URISyntaxException; import java.util.Arrays; import java.util.Base64; import java.util.Scanner; import java.util.TreeSet; +import javax.imageio.IIOException; + import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; @@ -88,26 +89,31 @@ public class RangePartitioner extends Partitioner<Text,Writable> implements Conf justification = "path provided by distributed cache framework, not user input") private synchronized Text[] getCutPoints() throws IOException { if (cutPointArray == null) { + Path path; String cutFileName = conf.get(CUTFILE_KEY); - Path[] cf = Job.getInstance().getLocalCacheFiles(); - - if (cf != null) { - for (Path path : cf) { - if (path.toUri().getPath() - .endsWith(cutFileName.substring(cutFileName.lastIndexOf('/')))) { - TreeSet<Text> cutPoints = new TreeSet<>(); - try (Scanner in = new Scanner(new BufferedReader( - new InputStreamReader(new FileInputStream(path.toString()), UTF_8)))) { - while (in.hasNextLine()) - cutPoints.add(new Text(Base64.getDecoder().decode(in.nextLine()))); - } - cutPointArray = cutPoints.toArray(new Text[cutPoints.size()]); - break; - } + File tempFile = new File(CUTFILE_KEY); + if (tempFile.exists()) { + path = new Path(CUTFILE_KEY); + } else { + path = new Path(cutFileName); + } + + if (path == null) + throw new FileNotFoundException("Cut point file not found in distributed cache"); + + TreeSet<Text> cutPoints = new TreeSet<>(); + FileSystem fs = FileSystem.get(conf); + FSDataInputStream inputStream = fs.open(path); + try (Scanner in = new Scanner(inputStream)) { + while (in.hasNextLine()) { + cutPoints.add(new Text(Base64.getDecoder().decode(in.nextLine()))); } } + + cutPointArray = cutPoints.toArray(new Text[cutPoints.size()]); + if (cutPointArray == null) - throw new FileNotFoundException(cutFileName + " not found in distributed cache"); + throw new IIOException("Cutpoint array not properly created from file" + path.getName()); } return cutPointArray; } @@ -127,7 +133,14 @@ public class RangePartitioner extends Partitioner<Text,Writable> implements Conf * points that represent ranges for partitioning */ public static void setSplitFile(Job job, String file) { - URI uri = new Path(file).toUri(); + URI uri; + try { + uri = new URI(file + "#" + CUTFILE_KEY); + } catch (URISyntaxException e) { + throw new IllegalStateException( + "Unable to add split file \"" + CUTFILE_KEY + "\" to distributed cache."); + } + job.addCacheFile(uri); job.getConfiguration().set(CUTFILE_KEY, uri.getPath()); } diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/ConfiguratorBase.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/ConfiguratorBase.java index 315ce2a..367046c 100644 --- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/ConfiguratorBase.java +++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/lib/ConfiguratorBase.java @@ -16,6 +16,7 @@ */ package org.apache.accumulo.hadoopImpl.mapreduce.lib; +import java.io.File; import java.io.IOException; import java.io.StringReader; import java.io.StringWriter; @@ -43,6 +44,8 @@ public class ConfiguratorBase { CLIENT_PROPS, CLIENT_PROPS_FILE, IS_CONFIGURED, STORE_JOB_CALLED } + public static final String clientPropsFileName = "propsfile"; + /** * Configuration keys for general configuration options. * @@ -62,6 +65,7 @@ public class ConfiguratorBase { * @return the configuration key * @since 1.6.0 */ + protected static String enumToConfKey(Class<?> implementingClass, Enum<?> e) { return implementingClass.getSimpleName() + "." + e.getDeclaringClass().getSimpleName() + "." + StringUtils.camelize(e.name().toLowerCase()); @@ -83,7 +87,8 @@ public class ConfiguratorBase { Properties props, String clientPropsPath) { if (clientPropsPath != null) { try { - DistributedCacheHelper.addCacheFile(new URI(clientPropsPath), conf); + DistributedCacheHelper.addCacheFile(new URI(clientPropsPath + "#" + clientPropsFileName), + conf); } catch (URISyntaxException e) { throw new IllegalStateException("Unable to add client properties file \"" + clientPropsPath + "\" to distributed cache."); @@ -107,13 +112,18 @@ public class ConfiguratorBase { conf.get(enumToConfKey(implementingClass, ClientOpts.CLIENT_PROPS_FILE), ""); if (!clientPropsFile.isEmpty()) { try { - URI[] uris = DistributedCacheHelper.getCacheFiles(conf); - Path path = null; - for (URI u : uris) { - if (u.toString().equals(clientPropsFile)) { - path = new Path(u); - } + Path path; + // See if the "propsfile" symlink was created and try to open the file it points to by it. + File tempFile = new File(ConfiguratorBase.clientPropsFileName); + if (tempFile.exists()) { + path = new Path(ConfiguratorBase.clientPropsFileName); + } else { + path = new Path(clientPropsFile); } + + if (path == null) + throw new IllegalStateException("Could not initialize properties file"); + FileSystem fs = FileSystem.get(conf); FSDataInputStream inputStream = fs.open(path); StringBuilder sb = new StringBuilder(); diff --git a/test/src/main/java/org/apache/accumulo/test/mapreduce/TokenFileIT.java b/test/src/main/java/org/apache/accumulo/test/mapreduce/TokenFileIT.java index c43b001..25a814a 100644 --- a/test/src/main/java/org/apache/accumulo/test/mapreduce/TokenFileIT.java +++ b/test/src/main/java/org/apache/accumulo/test/mapreduce/TokenFileIT.java @@ -24,8 +24,13 @@ import static org.junit.Assert.assertTrue; import java.io.File; import java.io.IOException; import java.io.PrintStream; +import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.Iterator; import java.util.Map.Entry; +import java.util.stream.Collectors; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; @@ -34,6 +39,7 @@ import org.apache.accumulo.core.client.BatchWriterConfig; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.clientImpl.ClientInfo; import org.apache.accumulo.core.clientImpl.Credentials; +import org.apache.accumulo.core.clientImpl.mapreduce.lib.ConfiguratorBase; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; @@ -84,6 +90,29 @@ public class TokenFileIT extends AccumuloClusterHarness { m.put("", "", Integer.toString(count)); context.write(new Text(), m); } + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + if (context.getCacheFiles() != null && context.getCacheFiles().length > 0) { + // At this point in the MapReduce Job you can get the cached files in HDFS if you want + URI[] cachedFiles = context.getCacheFiles(); + // On the line below we access the file by the hdfs fragment name created during caching + // in ConfiguratorBase + String fileByPsuedonym = ""; + fileByPsuedonym = getFileContents(ConfiguratorBase.cachedFileName); + + assertTrue(!fileByPsuedonym.isEmpty()); + assertTrue(cachedFiles.length > 0); + } + super.setup(context); + } + + private String getFileContents(String filename) throws IOException { + + Path filePath = Paths.get(filename); + return Files.lines(filePath).collect(Collectors.joining(System.lineSeparator())); + } + } @Override @@ -139,7 +168,6 @@ public class TokenFileIT extends AccumuloClusterHarness { return 1; } } - } @Rule