ctubbsii commented on code in PR #4898:
URL: https://github.com/apache/accumulo/pull/4898#discussion_r1919333556
##########
core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java:
##########
@@ -92,12 +93,17 @@ public class RFileWriter implements AutoCloseable {
private final FileSKVWriter writer;
private final LRUMap<ByteSequence,Boolean> validVisibilities;
+
+ // TODO should be able to completely remove this as lower level code is
already doing some things
Review Comment:
What's going on with this TODO?
##########
core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java:
##########
@@ -172,6 +183,12 @@ public WriterOptions withVisibilityCacheSize(int maxSize) {
return this;
}
+ @Override
+ public WriterOptions withSplitResolver(LoadPlan.SplitResolver splitResolver)
{
+ this.splitResolver = splitResolver;
Review Comment:
```suggestion
this.splitResolver = requireNonNull(splitResolver);
```
(assuming static import of `java.util.Objects.requireNonNull`)
##########
core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java:
##########
@@ -250,5 +259,27 @@ public void append(Iterable<Entry<Key,Value>> keyValues)
throws IOException {
@Override
public void close() throws IOException {
writer.close();
+ loadPlanCollector.close();
Review Comment:
```suggestion
try {
writer.close();
} finally {
loadPlanCollector.close();
}
```
##########
core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java:
##########
@@ -228,4 +242,225 @@ public LoadPlan build() {
}
};
}
+
+ private static final TableId FAKE_ID = TableId.of("999");
+
+ private static class JsonDestination {
+ String fileName;
+ String startRow;
+ String endRow;
+ RangeType rangeType;
+
+ JsonDestination() {}
+
+ JsonDestination(Destination destination) {
+ fileName = destination.getFileName();
+ startRow = destination.getStartRow() == null ? null
+ : Base64.getUrlEncoder().encodeToString(destination.getStartRow());
+ endRow = destination.getEndRow() == null ? null
+ : Base64.getUrlEncoder().encodeToString(destination.getEndRow());
+ rangeType = destination.getRangeType();
+ }
+
+ Destination toDestination() {
+ return new Destination(fileName, rangeType,
+ startRow == null ? null : Base64.getUrlDecoder().decode(startRow),
+ endRow == null ? null : Base64.getUrlDecoder().decode(endRow));
+ }
+ }
+
+ private static final class JsonAll {
+ List<JsonDestination> destinations;
+
+ JsonAll() {}
+
+ JsonAll(List<Destination> destinations) {
+ this.destinations =
+
destinations.stream().map(JsonDestination::new).collect(Collectors.toList());
+ }
+
+ }
+
+ private static final Gson gson = new
GsonBuilder().disableJdkUnsafe().serializeNulls().create();
+
+ /**
+ * Serializes the load plan to json that looks like the following. The
values of startRow and
+ * endRow field are base64 encoded using {@link Base64#getUrlEncoder()}.
+ *
+ * <pre>
+ * {
+ * "destinations": [
+ * {
+ * "fileName": "f1.rf",
+ * "startRow": null,
+ * "endRow": "MDAz",
+ * "rangeType": "TABLE"
+ * },
+ * {
+ * "fileName": "f2.rf",
+ * "startRow": "MDA0",
+ * "endRow": "MDA3",
+ * "rangeType": "FILE"
+ * },
+ * {
+ * "fileName": "f1.rf",
+ * "startRow": "MDA1",
+ * "endRow": "MDA2",
+ * "rangeType": "TABLE"
+ * },
+ * {
+ * "fileName": "f3.rf",
+ * "startRow": "MDA4",
+ * "endRow": null,
+ * "rangeType": "TABLE"
+ * }
+ * ]
+ * }
+ * </pre>
+ *
+ * @since 2.1.4
+ */
+ public String toJson() {
+ return gson.toJson(new JsonAll(destinations));
+ }
+
+ /**
+ * Deserializes json to a load plan.
+ *
+ * @param json produced by {@link #toJson()}
+ */
+ public static LoadPlan fromJson(String json) {
+ var dests = gson.fromJson(json, JsonAll.class).destinations.stream()
+
.map(JsonDestination::toDestination).collect(Collectors.toUnmodifiableList());
+ return new LoadPlan(dests);
+ }
+
+ /**
+ * Represents two split points that exist in a table being bulk imported to.
+ *
+ * @since 2.1.4
+ */
+ public static class TableSplits {
+ private final Text prevRow;
+ private final Text endRow;
+
+ public TableSplits(Text prevRow, Text endRow) {
+ Preconditions.checkArgument(
+ prevRow == null || endRow == null || prevRow.compareTo(endRow) < 0,
"%s >= %s", prevRow,
+ endRow);
+ this.prevRow = prevRow;
+ this.endRow = endRow;
+ }
+
+ public Text getPrevRow() {
+ return prevRow;
+ }
+
+ public Text getEndRow() {
+ return endRow;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TableSplits that = (TableSplits) o;
+ return Objects.equals(prevRow, that.prevRow) && Objects.equals(endRow,
that.endRow);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(prevRow, endRow);
+ }
+
+ @Override
+ public String toString() {
+ return "(" + prevRow + "," + endRow + "]";
+ }
+ }
+
+ /**
+ * A function that maps a row to two table split points that contain the
row. These splits must
+ * exist in the table being bulk imported to. There is no requirement that
the splits are
+ * contiguous. For example if a table has splits C,D,E,M and we ask for
splits containing row H
+ * its ok to return D,M, but that could result in the file mapping to more
actual tablets than
+ * needed.
+ *
+ * @since 2.1.4
+ */
+ public interface SplitResolver extends Function<Text,TableSplits> {
+ static SplitResolver from(SortedSet<Text> splits) {
+ return row -> {
+ var headSet = splits.headSet(row);
+ Text prevRow = headSet.isEmpty() ? null : headSet.last();
+ var tailSet = splits.tailSet(row);
+ Text endRow = tailSet.isEmpty() ? null : tailSet.first();
+ return new TableSplits(prevRow, endRow);
+ };
+ }
+
+ /**
+ * For a given row R this function should find two split points S1 and S2
that exist in the
+ * table being bulk imported to such that S1 < R <= S2. The closer
S1 and S2 are to each
+ * other the better.
+ */
Review Comment:
```suggestion
/**
* For a given row, R, this function should find two split points, S1
and S2, that exist in the
* table being bulk imported to, such that S1 < R <= S2. The
closer S1 and S2 are to each
* other, the better.
*/
```
##########
core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java:
##########
@@ -228,4 +242,225 @@ public LoadPlan build() {
}
};
}
+
+ private static final TableId FAKE_ID = TableId.of("999");
Review Comment:
This arbitrary value could use a comment
##########
core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java:
##########
@@ -106,6 +108,13 @@ public RFileWriter build() throws IOException {
CryptoService cs =
CryptoFactoryLoader.getServiceForClient(CryptoEnvironment.Scope.TABLE,
tableConfig);
+ LoadPlanCollector loadPlanCollector;
+ if (splitResolver != null) {
+ loadPlanCollector = new LoadPlanCollector(splitResolver);
+ } else {
+ loadPlanCollector = new LoadPlanCollector();
+ }
Review Comment:
This might fit on one line. (also it's nice to flip the null check, because
the "else" block for "!=" reads as "not the case that it's not equal", or "not
not equal" and it's nice to remove the double negatives)
```suggestion
var loadPlanCollector = splitResolver == null ? new LoadPlanCollector()
: new LoadPlanCollector(splitResolver);
```
You could also make the constructor take an Optional and pass it in as
`Optional.ofNullable(splitResolver)`
##########
core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java:
##########
@@ -428,6 +429,15 @@ default WriterOptions
withSummarizers(SummarizerConfiguration... summarizerConf)
*/
WriterOptions withVisibilityCacheSize(int maxSize);
+ /**
+ * @param splitResolver builds a {@link LoadPlan} using table split points
provided by the given
+ * splitResolver.
+ * @return this
+ * @see RFileWriter#getLoadPlan(String)
+ * @since 2.1.4
+ */
+ WriterOptions withSplitResolver(LoadPlan.SplitResolver splitResolver);
Review Comment:
Adding this breaks semver. However, I don't think the RFile API is widely
used, so it may be okay to just document and accept the breakage in the release
notes.
##########
core/src/main/java/org/apache/accumulo/core/client/rfile/LoadPlanCollector.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.client.rfile;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.LoadPlan;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.hadoop.io.Text;
+
+import com.google.common.base.Preconditions;
+
+class LoadPlanCollector {
+
+ private final LoadPlan.SplitResolver splitResolver;
+ private boolean finished = false;
+ private Text lgFirstRow;
+ private Text lgLastRow;
+ private Text firstRow;
+ private Text lastRow;
+ private Set<KeyExtent> overlappingExtents;
+ private KeyExtent currentExtent;
+ private long appended = 0;
+
+ LoadPlanCollector(LoadPlan.SplitResolver splitResolver) {
+ this.splitResolver = splitResolver;
+ this.overlappingExtents = new HashSet<>();
+ }
+
+ LoadPlanCollector() {
+ splitResolver = null;
+ this.overlappingExtents = null;
+
+ }
+
+ private void appendNoSplits(Key key) {
+ if (lgFirstRow == null) {
+ lgFirstRow = key.getRow();
+ lgLastRow = lgFirstRow;
+ } else {
+ var row = key.getRow();
+ lgLastRow = row;
+ }
+ }
+
+ private static final TableId FAKE_ID = TableId.of("123");
Review Comment:
This arbitrary value could use a comment
##########
core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java:
##########
@@ -250,5 +259,27 @@ public void append(Iterable<Entry<Key,Value>> keyValues)
throws IOException {
@Override
public void close() throws IOException {
writer.close();
+ loadPlanCollector.close();
+ }
+
+ /**
+ * If no split resolver was provided when the RFileWriter was built then
this method will return a
+ * simple load plan of type {@link
org.apache.accumulo.core.data.LoadPlan.RangeType#FILE} using
+ * the first and last row seen. If a splitResolver was provided then this
will return a load plan
+ * of type {@link org.apache.accumulo.core.data.LoadPlan.RangeType#TABLE}
that has the split
+ * ranges the rows written overlapped.
+ *
+ * @param filename This file name will be used in the load plan and it
should match the name that
+ * will be used when bulk importing this file. Only a filename is
needed, not a full path.
+ * @return load plan computed from the keys written to the rfile.
+ * @see
org.apache.accumulo.core.client.rfile.RFile.WriterOptions#withSplitResolver(LoadPlan.SplitResolver)
+ * @since 2.1.4
+ * @throws IllegalStateException is attempting to get load plan before
calling {@link #close()}
+ * @throws IllegalArgumentException is a full path is passed instead of a
filename
+ */
+ public LoadPlan getLoadPlan(String filename) {
+ Preconditions.checkArgument(!filename.contains("/"),
+ "Unexpected path %s seen instead of file name", filename);
+ return loadPlanCollector.getLoadPlan(filename);
Review Comment:
same semver comment
##########
core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java:
##########
@@ -228,4 +242,225 @@ public LoadPlan build() {
}
};
}
+
+ private static final TableId FAKE_ID = TableId.of("999");
+
+ private static class JsonDestination {
+ String fileName;
+ String startRow;
+ String endRow;
+ RangeType rangeType;
+
+ JsonDestination() {}
+
+ JsonDestination(Destination destination) {
+ fileName = destination.getFileName();
+ startRow = destination.getStartRow() == null ? null
+ : Base64.getUrlEncoder().encodeToString(destination.getStartRow());
+ endRow = destination.getEndRow() == null ? null
+ : Base64.getUrlEncoder().encodeToString(destination.getEndRow());
+ rangeType = destination.getRangeType();
+ }
+
+ Destination toDestination() {
+ return new Destination(fileName, rangeType,
+ startRow == null ? null : Base64.getUrlDecoder().decode(startRow),
+ endRow == null ? null : Base64.getUrlDecoder().decode(endRow));
+ }
+ }
+
+ private static final class JsonAll {
+ List<JsonDestination> destinations;
+
+ JsonAll() {}
+
+ JsonAll(List<Destination> destinations) {
+ this.destinations =
+
destinations.stream().map(JsonDestination::new).collect(Collectors.toList());
+ }
+
+ }
+
+ private static final Gson gson = new
GsonBuilder().disableJdkUnsafe().serializeNulls().create();
+
+ /**
+ * Serializes the load plan to json that looks like the following. The
values of startRow and
+ * endRow field are base64 encoded using {@link Base64#getUrlEncoder()}.
+ *
+ * <pre>
+ * {
+ * "destinations": [
+ * {
+ * "fileName": "f1.rf",
+ * "startRow": null,
+ * "endRow": "MDAz",
+ * "rangeType": "TABLE"
+ * },
+ * {
+ * "fileName": "f2.rf",
+ * "startRow": "MDA0",
+ * "endRow": "MDA3",
+ * "rangeType": "FILE"
+ * },
+ * {
+ * "fileName": "f1.rf",
+ * "startRow": "MDA1",
+ * "endRow": "MDA2",
+ * "rangeType": "TABLE"
+ * },
+ * {
+ * "fileName": "f3.rf",
+ * "startRow": "MDA4",
+ * "endRow": null,
+ * "rangeType": "TABLE"
+ * }
+ * ]
+ * }
+ * </pre>
+ *
+ * @since 2.1.4
+ */
+ public String toJson() {
+ return gson.toJson(new JsonAll(destinations));
+ }
+
+ /**
+ * Deserializes json to a load plan.
+ *
+ * @param json produced by {@link #toJson()}
+ */
+ public static LoadPlan fromJson(String json) {
+ var dests = gson.fromJson(json, JsonAll.class).destinations.stream()
+
.map(JsonDestination::toDestination).collect(Collectors.toUnmodifiableList());
+ return new LoadPlan(dests);
+ }
+
+ /**
+ * Represents two split points that exist in a table being bulk imported to.
+ *
+ * @since 2.1.4
+ */
+ public static class TableSplits {
+ private final Text prevRow;
+ private final Text endRow;
+
+ public TableSplits(Text prevRow, Text endRow) {
+ Preconditions.checkArgument(
+ prevRow == null || endRow == null || prevRow.compareTo(endRow) < 0,
"%s >= %s", prevRow,
+ endRow);
+ this.prevRow = prevRow;
+ this.endRow = endRow;
+ }
+
+ public Text getPrevRow() {
+ return prevRow;
+ }
+
+ public Text getEndRow() {
+ return endRow;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TableSplits that = (TableSplits) o;
+ return Objects.equals(prevRow, that.prevRow) && Objects.equals(endRow,
that.endRow);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(prevRow, endRow);
+ }
+
+ @Override
+ public String toString() {
+ return "(" + prevRow + "," + endRow + "]";
+ }
+ }
+
+ /**
+ * A function that maps a row to two table split points that contain the
row. These splits must
+ * exist in the table being bulk imported to. There is no requirement that
the splits are
Review Comment:
The requirement that they exist leaves open the question of whether they can
be null to represent the first tablet (in the case of the first being null) or
the last tablet (in the case of the second being null). I would add a
clarification here in the javadoc for those cases.
##########
core/src/main/java/org/apache/accumulo/core/file/FileOperations.java:
##########
@@ -53,6 +53,13 @@ public abstract class FileOperations {
Set.of(Constants.BULK_LOAD_MAPPING, Constants.BULK_RENAME_FILE,
FileOutputCommitter.SUCCEEDED_FILE_NAME, HADOOP_JOBHISTORY_LOCATION);
+ public static boolean isBulkWorkingFile(String fileName) {
+ if (fileName.startsWith(Constants.BULK_WORKING_PREFIX)) {
+ return true;
+ }
+ return bulkWorkingFiles.contains(fileName);
Review Comment:
```suggestion
return fileName.startsWith(Constants.BULK_WORKING_PREFIX) ||
bulkWorkingFiles.contains(fileName);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]