This is an automated email from the ASF dual-hosted git repository. mmiller pushed a commit to branch 1.9 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/1.9 by this push: new af05b27 Add colVis file to ContinuousInputFormat. Fixes #1368 (#1369) af05b27 is described below commit af05b27ae9e2a576907e7d1d96d8360f187434a7 Author: Mike Miller <mmil...@apache.org> AuthorDate: Thu Sep 19 18:46:49 2019 -0400 Add colVis file to ContinuousInputFormat. Fixes #1368 (#1369) --- .../accumulo/test/continuous/ContinuousIngest.java | 25 +++++++++++++--------- .../test/continuous/ContinuousInputFormat.java | 17 +++++++++++++-- 2 files changed, 30 insertions(+), 12 deletions(-) diff --git a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousIngest.java b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousIngest.java index f1fd4d9..17342bc 100644 --- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousIngest.java +++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousIngest.java @@ -19,6 +19,7 @@ package org.apache.accumulo.test.continuous; import static java.nio.charset.StandardCharsets.UTF_8; import java.io.BufferedReader; +import java.io.IOException; import java.io.InputStreamReader; import java.util.ArrayList; import java.util.Collections; @@ -57,19 +58,23 @@ public class ContinuousIngest { return; } - visibilities = new ArrayList<>(); - - FileSystem fs = FileSystem.get(new Configuration()); - BufferedReader in = - new BufferedReader(new InputStreamReader(fs.open(new Path(opts.visFile)), UTF_8)); + visibilities = readVisFromFile(opts.visFile); + } - String line; + public static List<ColumnVisibility> readVisFromFile(String visFile) { + List<ColumnVisibility> vis = new ArrayList<>(); - while ((line = in.readLine()) != null) { - visibilities.add(new ColumnVisibility(line)); + try (BufferedReader in = new BufferedReader(new InputStreamReader( + FileSystem.get(new Configuration()).open(new Path(visFile)), UTF_8))) { + String line; + while ((line = in.readLine()) != null) { + vis.add(new ColumnVisibility(line)); + } + } catch (IOException e) { + System.out.println("ERROR reading visFile " + visFile + ": "); + e.printStackTrace(); } - - in.close(); + return vis; } private static ColumnVisibility getVisibility(Random rand) { diff --git a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousInputFormat.java b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousInputFormat.java index 6aed14a..f140b71 100644 --- a/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousInputFormat.java +++ b/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousInputFormat.java @@ -27,6 +27,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.security.SecureRandom; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Random; import java.util.zip.CRC32; @@ -34,6 +35,7 @@ import java.util.zip.Checksum; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.ColumnVisibility; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputFormat; @@ -56,6 +58,7 @@ public class ContinuousInputFormat extends InputFormat<Key,Value> { private static final String PROP_FAM_MAX = "mrbulk.fam.max"; private static final String PROP_QUAL_MAX = "mrbulk.qual.max"; private static final String PROP_CHECKSUM = "mrbulk.checksum"; + private static final String PROP_VIS_FILE = "mrbulk.vis.file"; private static class RandomSplit extends InputSplit implements Writable { @Override @@ -93,6 +96,8 @@ public class ContinuousInputFormat extends InputFormat<Key,Value> { conf.setLong(PROP_ROW_MAX, opts.max); conf.setInt(PROP_FAM_MAX, opts.maxColF); conf.setInt(PROP_QUAL_MAX, opts.maxColQ); + if (opts.visFile != null) + conf.set(PROP_VIS_FILE, opts.visFile); conf.setBoolean(PROP_CHECKSUM, opts.checksum); } @@ -110,6 +115,7 @@ public class ContinuousInputFormat extends InputFormat<Key,Value> { long maxRow; int maxFam; int maxQual; + List<ColumnVisibility> visibilities; boolean checksum; Key prevKey; @@ -126,6 +132,12 @@ public class ContinuousInputFormat extends InputFormat<Key,Value> { maxFam = job.getConfiguration().getInt(PROP_FAM_MAX, Short.MAX_VALUE); maxQual = job.getConfiguration().getInt(PROP_QUAL_MAX, Short.MAX_VALUE); checksum = job.getConfiguration().getBoolean(PROP_CHECKSUM, false); + String visFile = job.getConfiguration().get(PROP_VIS_FILE); + if (visFile == null) { + visibilities = Collections.singletonList(new ColumnVisibility()); + } else { + visibilities = ContinuousIngest.readVisFromFile(visFile); + } random = new Random(new SecureRandom().nextLong()); @@ -138,15 +150,16 @@ public class ContinuousInputFormat extends InputFormat<Key,Value> { byte[] fam = genCol(random.nextInt(maxFam)); byte[] qual = genCol(random.nextInt(maxQual)); + byte[] cv = visibilities.get(random.nextInt(visibilities.size())).flatten(); if (cksum != null) { cksum.update(row); cksum.update(fam); cksum.update(qual); - cksum.update(new byte[0]); // TODO col vis + cksum.update(cv); } - return new Key(row, fam, qual); + return new Key(row, fam, qual, cv); } private byte[] createValue(byte[] ingestInstanceId, byte[] prevRow, Checksum cksum) {