keith-turner commented on a change in pull request #2117:
URL: https://github.com/apache/accumulo/pull/2117#discussion_r645846717
##########
File path:
server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java
##########
@@ -189,4 +195,90 @@ public String toString() {
}
throw new RuntimeException("Unknown type of entry: " + event);
}
+
+ /**
+ * Converts LogFileKey to Key. Creates a Key containing all of the
LogFileKey fields. The fields
+ * are stored so the Key sorts maintaining the legacy sort order. The row of
the Key is composed
+ * of 3 fields: EventNum + tabletID + seq (separated by underscore). The
EventNum is the integer
+ * returned by eventType(). The column family is always the event. The
column qualifier is
+ * dependent of the type of event and could be empty.
+ *
+ * <pre>
+ * Key Schema:
+ * Row = EventNum_tabletID_seq
+ * Family = event
+ * Qualifier = tserverSession OR filename OR KeyExtent
+ * </pre>
+ */
+ public Key toKey() throws IOException {
+ String row = "";
+ int eventNum = eventType(event);
+ String family = event.name();
+ String qual = "";
+ switch (event) {
+ case OPEN:
+ row = formatRow(eventNum, 0, 0);
+ qual = tserverSession;
+ break;
+ case COMPACTION_START:
+ row = formatRow(eventNum, tabletId, seq);
+ if (filename != null)
+ qual = filename;
+ break;
+ case MUTATION:
+ case MANY_MUTATIONS:
+ case COMPACTION_FINISH:
+ row = formatRow(eventNum, tabletId, seq);
+ break;
+ case DEFINE_TABLET:
+ row = formatRow(eventNum, tabletId, seq);
+ // Base64 encode KeyExtent
+ DataOutputBuffer buffer = new DataOutputBuffer();
+ tablet.writeTo(buffer);
+ qual = Base64.getEncoder().encodeToString(copyOf(buffer.getData(),
buffer.getLength()));
+ buffer.close();
+ break;
+ }
+ return new Key(new Text(row), new Text(family), new Text(qual));
+ }
+
+ // format row = 1_000001_0000000001
+ private String formatRow(int eventNum, int tabletId, long seq) {
+ return String.format("%d_%06d_%010d", eventNum, tabletId, seq);
+ }
Review comment:
Could do something like the following will be much faster and more
compact. This would also make decoding the row really fast. Think it should
sort the same. Also the not sure how `%06` will handle tablet ids over 10
million. I looked at the source code from DataOutputStream writeLong and
writeInt to see what they did.
```java
private byte[] formatRow(byte eventNum, int tabletId, long seq){
byte[] row = new byte[13];
row[0] = eventNum;
row[1] = ((tabletId >>> 24) & 0xff);
row[2] = ((tabletId >>> 16) & 0xff);
row[3] = ((tabletId >>> 8) & 0xff);
row[4] = (tabletId & 0xff);
row[5] = (byte)(seq >>> 56);
row[6] = (byte)(seq >>> 48);
row[7] = (byte)(seq >>> 40);
row[8] = (byte)(seq >>> 32);
row[9] = (byte)(seq >>> 24);
row[10] = (byte)(seq >>> 16);
row[11] = (byte)(seq >>> 8);
row[12] = (byte)(seq >>> 0);
return row;
}
```
##########
File path:
server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java
##########
@@ -189,4 +195,90 @@ public String toString() {
}
throw new RuntimeException("Unknown type of entry: " + event);
}
+
+ /**
+ * Converts LogFileKey to Key. Creates a Key containing all of the
LogFileKey fields. The fields
+ * are stored so the Key sorts maintaining the legacy sort order. The row of
the Key is composed
+ * of 3 fields: EventNum + tabletID + seq (separated by underscore). The
EventNum is the integer
+ * returned by eventType(). The column family is always the event. The
column qualifier is
+ * dependent of the type of event and could be empty.
+ *
+ * <pre>
+ * Key Schema:
+ * Row = EventNum_tabletID_seq
+ * Family = event
+ * Qualifier = tserverSession OR filename OR KeyExtent
+ * </pre>
+ */
+ public Key toKey() throws IOException {
+ String row = "";
+ int eventNum = eventType(event);
+ String family = event.name();
+ String qual = "";
+ switch (event) {
+ case OPEN:
+ row = formatRow(eventNum, 0, 0);
+ qual = tserverSession;
+ break;
+ case COMPACTION_START:
+ row = formatRow(eventNum, tabletId, seq);
+ if (filename != null)
Review comment:
I think this being null is unexpected for this event type, so if it is
null I would say an NPE happen later. The code that serializes to a DataOutput
would throw an NPE if it were null. Should it happen to be null it would be
better to fail before writing than to mask the issue.
##########
File path:
server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
##########
@@ -20,60 +20,85 @@
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ScannerBase;
+import org.apache.accumulo.core.client.rfile.RFile;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.log.SortedLogState;
+import org.apache.accumulo.tserver.logger.LogEvents;
import org.apache.accumulo.tserver.logger.LogFileKey;
-import org.apache.accumulo.tserver.logger.LogFileValue;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Iterators;
-import com.google.common.collect.UnmodifiableIterator;
/**
* Iterates over multiple sorted recovery logs merging them into a single
sorted stream.
*/
-public class RecoveryLogsIterator implements
CloseableIterator<Entry<LogFileKey,LogFileValue>> {
+public class RecoveryLogsIterator implements Iterator<Entry<Key,Value>>,
AutoCloseable {
Review comment:
Could this implement `Iterator<Entry<LogFileKey,LogFileValue>>`? Seems
like the code that uses it always converts Key to LogFileKey. If that
conversion happened in this class maybe it would simplify the code a bit.
##########
File path:
server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java
##########
@@ -189,4 +195,90 @@ public String toString() {
}
throw new RuntimeException("Unknown type of entry: " + event);
}
+
+ /**
+ * Converts LogFileKey to Key. Creates a Key containing all of the
LogFileKey fields. The fields
+ * are stored so the Key sorts maintaining the legacy sort order. The row of
the Key is composed
+ * of 3 fields: EventNum + tabletID + seq (separated by underscore). The
EventNum is the integer
+ * returned by eventType(). The column family is always the event. The
column qualifier is
+ * dependent of the type of event and could be empty.
+ *
+ * <pre>
+ * Key Schema:
+ * Row = EventNum_tabletID_seq
+ * Family = event
+ * Qualifier = tserverSession OR filename OR KeyExtent
+ * </pre>
+ */
+ public Key toKey() throws IOException {
+ String row = "";
+ int eventNum = eventType(event);
+ String family = event.name();
+ String qual = "";
+ switch (event) {
+ case OPEN:
+ row = formatRow(eventNum, 0, 0);
+ qual = tserverSession;
+ break;
+ case COMPACTION_START:
+ row = formatRow(eventNum, tabletId, seq);
+ if (filename != null)
+ qual = filename;
+ break;
+ case MUTATION:
+ case MANY_MUTATIONS:
+ case COMPACTION_FINISH:
+ row = formatRow(eventNum, tabletId, seq);
+ break;
+ case DEFINE_TABLET:
+ row = formatRow(eventNum, tabletId, seq);
+ // Base64 encode KeyExtent
+ DataOutputBuffer buffer = new DataOutputBuffer();
+ tablet.writeTo(buffer);
+ qual = Base64.getEncoder().encodeToString(copyOf(buffer.getData(),
buffer.getLength()));
+ buffer.close();
+ break;
+ }
+ return new Key(new Text(row), new Text(family), new Text(qual));
+ }
+
+ // format row = 1_000001_0000000001
+ private String formatRow(int eventNum, int tabletId, long seq) {
+ return String.format("%d_%06d_%010d", eventNum, tabletId, seq);
+ }
+
+ /**
+ * Create LogFileKey from row. Follows schema defined by {@link #toKey()}
+ */
+ public static LogFileKey fromKey(Key key) throws IOException {
+ var logFileKey = new LogFileKey();
+ String[] rowParts = key.getRow().toString().split("_");
+ int tabletId = Integer.parseInt(rowParts[1]);
+ long seq = Long.parseLong(rowParts[2]);
+ String qualifier = key.getColumnQualifier().toString();
+
+ logFileKey.tabletId = tabletId;
+ logFileKey.seq = seq;
+ logFileKey.event = LogEvents.valueOf(key.getColumnFamily().toString());
+
+ // handle special cases of what is stored in the qualifier
+ switch (logFileKey.event) {
+ case OPEN:
+ logFileKey.tserverSession = qualifier;
+ break;
+ case COMPACTION_START:
+ logFileKey.filename = qualifier;
+ break;
+ case DEFINE_TABLET:
+ // decode Base64 KeyExtent
+ DataInputBuffer buffer = new DataInputBuffer();
+ byte[] bytes = Base64.getDecoder().decode(qualifier);
+ buffer.reset(bytes, bytes.length);
+ logFileKey.tablet = KeyExtent.readFrom(buffer);
+ buffer.close();
+ break;
+ }
Review comment:
Could do something like the following to cover all cases.
```suggestion
case COMPACTION_FINISH:
case MANY_MUTATIONS:
case MUTATION:
// nothing to do
break;
default:
throw new AssertionError();
}
```
##########
File path:
server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
##########
@@ -61,56 +66,58 @@
private static final Logger log =
LoggerFactory.getLogger(SortedLogRecovery.class);
- private VolumeManager fs;
+ private final ServerContext context;
- public SortedLogRecovery(VolumeManager fs) {
- this.fs = fs;
+ public SortedLogRecovery(ServerContext context) {
+ this.context = context;
}
- static LogFileKey maxKey(LogEvents event) {
+ static LogFileKey maxKey(LogEvents event, KeyExtent extent) {
LogFileKey key = new LogFileKey();
key.event = event;
key.tabletId = Integer.MAX_VALUE;
key.seq = Long.MAX_VALUE;
+ key.tablet = extent;
Review comment:
If the range is constructed only using the row as I suggested elsewhere,
then maybe this could be set to some dummy extent w/ a comment saying its only
needed for serialization but is not needed for constructing a range.
##########
File path:
server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java
##########
@@ -189,4 +195,90 @@ public String toString() {
}
throw new RuntimeException("Unknown type of entry: " + event);
}
+
+ /**
+ * Converts LogFileKey to Key. Creates a Key containing all of the
LogFileKey fields. The fields
+ * are stored so the Key sorts maintaining the legacy sort order. The row of
the Key is composed
+ * of 3 fields: EventNum + tabletID + seq (separated by underscore). The
EventNum is the integer
+ * returned by eventType(). The column family is always the event. The
column qualifier is
+ * dependent of the type of event and could be empty.
+ *
+ * <pre>
+ * Key Schema:
+ * Row = EventNum_tabletID_seq
+ * Family = event
+ * Qualifier = tserverSession OR filename OR KeyExtent
+ * </pre>
+ */
+ public Key toKey() throws IOException {
+ String row = "";
+ int eventNum = eventType(event);
+ String family = event.name();
+ String qual = "";
+ switch (event) {
+ case OPEN:
+ row = formatRow(eventNum, 0, 0);
+ qual = tserverSession;
+ break;
+ case COMPACTION_START:
+ row = formatRow(eventNum, tabletId, seq);
+ if (filename != null)
+ qual = filename;
+ break;
+ case MUTATION:
+ case MANY_MUTATIONS:
+ case COMPACTION_FINISH:
+ row = formatRow(eventNum, tabletId, seq);
+ break;
+ case DEFINE_TABLET:
+ row = formatRow(eventNum, tabletId, seq);
+ // Base64 encode KeyExtent
+ DataOutputBuffer buffer = new DataOutputBuffer();
+ tablet.writeTo(buffer);
+ qual = Base64.getEncoder().encodeToString(copyOf(buffer.getData(),
buffer.getLength()));
Review comment:
Could put the bytes from encoding the extent in the qual, binary data is
ok. This would avoid base64.
##########
File path:
server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileValue.java
##########
@@ -93,4 +98,23 @@ public String toString() {
return format(this, 5);
}
+ /**
+ * Convert list of mutations to a byte array and use to create a Value
+ */
+ public Value toValue() throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
Review comment:
There are not great alternatives, but these built in java types can kill
performance because they are heavily synchronized. Can end up calling lots of
sync methods when serializing something. For cases like this where only a
single thread will ever use this its annoying. There may be something else to
use, I will have to look around.
##########
File path:
server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
##########
@@ -20,60 +20,85 @@
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ScannerBase;
+import org.apache.accumulo.core.client.rfile.RFile;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.log.SortedLogState;
+import org.apache.accumulo.tserver.logger.LogEvents;
import org.apache.accumulo.tserver.logger.LogFileKey;
-import org.apache.accumulo.tserver.logger.LogFileValue;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Iterators;
-import com.google.common.collect.UnmodifiableIterator;
/**
* Iterates over multiple sorted recovery logs merging them into a single
sorted stream.
*/
-public class RecoveryLogsIterator implements
CloseableIterator<Entry<LogFileKey,LogFileValue>> {
+public class RecoveryLogsIterator implements Iterator<Entry<Key,Value>>,
AutoCloseable {
private static final Logger LOG =
LoggerFactory.getLogger(RecoveryLogsIterator.class);
- List<CloseableIterator<Entry<LogFileKey,LogFileValue>>> iterators;
- private UnmodifiableIterator<Entry<LogFileKey,LogFileValue>> iter;
+ private final List<Scanner> scanners;
+ private final Iterator<Entry<Key,Value>> iter;
/**
- * Iterates only over keys in the range [start,end].
+ * Scans the files in each recoveryLogDir over the range [start,end].
*/
- RecoveryLogsIterator(VolumeManager fs, List<Path> recoveryLogPaths,
LogFileKey start,
- LogFileKey end) throws IOException {
+ RecoveryLogsIterator(ServerContext context, List<Path> recoveryLogDirs,
LogFileKey start,
+ LogFileKey end, boolean checkFirstKey, LogEvents... colFamToFetch)
throws IOException {
- iterators = new ArrayList<>(recoveryLogPaths.size());
+ List<Iterator<Entry<Key,Value>>> iterators = new
ArrayList<>(recoveryLogDirs.size());
+ scanners = new ArrayList<>();
+ Key startKey = start.toKey();
+ Key endKey = end.toKey();
+ Range range = new Range(startKey, endKey);
Review comment:
It seems like all of the important information for sorting is encoded in
the row and the information in row matches the LogFileKey.compareTo() function.
The other fields of the key have extra information that is not considered in
LogFileKey.compareTo(). Therefore I am thinking the range should be
consturcted using only the row, to get entire rows and not stop at some
arbitrary point in a row based on whats in the key. Not sure, but I Think this
will make these changes behave in the exact same way as the older one.
Without the change below may need to make the maxKey function set an extent
that greater than all extents so that the qual is large enough.
```suggestion
Range range = new Range(startKey.getRow(), endKey.getRow());
```
##########
File path:
server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java
##########
@@ -189,4 +195,90 @@ public String toString() {
}
throw new RuntimeException("Unknown type of entry: " + event);
}
+
+ /**
+ * Converts LogFileKey to Key. Creates a Key containing all of the
LogFileKey fields. The fields
+ * are stored so the Key sorts maintaining the legacy sort order. The row of
the Key is composed
+ * of 3 fields: EventNum + tabletID + seq (separated by underscore). The
EventNum is the integer
+ * returned by eventType(). The column family is always the event. The
column qualifier is
+ * dependent of the type of event and could be empty.
+ *
+ * <pre>
+ * Key Schema:
+ * Row = EventNum_tabletID_seq
+ * Family = event
+ * Qualifier = tserverSession OR filename OR KeyExtent
+ * </pre>
+ */
+ public Key toKey() throws IOException {
+ String row = "";
+ int eventNum = eventType(event);
+ String family = event.name();
+ String qual = "";
+ switch (event) {
+ case OPEN:
+ row = formatRow(eventNum, 0, 0);
+ qual = tserverSession;
+ break;
+ case COMPACTION_START:
+ row = formatRow(eventNum, tabletId, seq);
+ if (filename != null)
+ qual = filename;
+ break;
+ case MUTATION:
+ case MANY_MUTATIONS:
+ case COMPACTION_FINISH:
+ row = formatRow(eventNum, tabletId, seq);
+ break;
+ case DEFINE_TABLET:
+ row = formatRow(eventNum, tabletId, seq);
+ // Base64 encode KeyExtent
+ DataOutputBuffer buffer = new DataOutputBuffer();
+ tablet.writeTo(buffer);
+ qual = Base64.getEncoder().encodeToString(copyOf(buffer.getData(),
buffer.getLength()));
+ buffer.close();
+ break;
+ }
+ return new Key(new Text(row), new Text(family), new Text(qual));
+ }
+
+ // format row = 1_000001_0000000001
+ private String formatRow(int eventNum, int tabletId, long seq) {
+ return String.format("%d_%06d_%010d", eventNum, tabletId, seq);
+ }
+
+ /**
+ * Create LogFileKey from row. Follows schema defined by {@link #toKey()}
+ */
+ public static LogFileKey fromKey(Key key) throws IOException {
+ var logFileKey = new LogFileKey();
+ String[] rowParts = key.getRow().toString().split("_");
+ int tabletId = Integer.parseInt(rowParts[1]);
+ long seq = Long.parseLong(rowParts[2]);
+ String qualifier = key.getColumnQualifier().toString();
+
+ logFileKey.tabletId = tabletId;
+ logFileKey.seq = seq;
+ logFileKey.event = LogEvents.valueOf(key.getColumnFamily().toString());
Review comment:
This avoids converting to text which does a copy.
```suggestion
logFileKey.event =
LogEvents.valueOf(key.getColumnFamilyData().toString());
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]