http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 553e8bc..70fcd2c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -36,7 +36,8 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.common.ValidTxnWriteIdList; +import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.DataOperationType; @@ -49,6 +50,7 @@ import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater; import org.apache.hadoop.hive.ql.io.orc.Reader; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.plan.CreateTableDesc; +import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId; import org.apache.hadoop.hive.shims.ShimLoader; @@ -224,8 +226,8 @@ public class AcidUtils { return deleteDeltaSubdir(min, max) + "_" + String.format(STATEMENT_DIGITS, statementId); } - public static String baseDir(long txnId) { - return BASE_PREFIX + String.format(DELTA_DIGITS, txnId); + public static String baseDir(long writeId) { + return BASE_PREFIX + String.format(DELTA_DIGITS, writeId); } /** @@ -254,31 +256,31 @@ public class AcidUtils { options.getBucketId()) + "_0"); } else if (options.isWritingBase()) { subdir = BASE_PREFIX + String.format(DELTA_DIGITS, - options.getMaximumTransactionId()); + options.getMaximumWriteId()); } else if(options.getStatementId() == -1) { //when minor compaction runs, we collapse per statement delta files inside a single //transaction so we no longer need a statementId in the file name subdir = options.isWritingDeleteDelta() ? - deleteDeltaSubdir(options.getMinimumTransactionId(), - options.getMaximumTransactionId()) - : deltaSubdir(options.getMinimumTransactionId(), - options.getMaximumTransactionId()); + deleteDeltaSubdir(options.getMinimumWriteId(), + options.getMaximumWriteId()) + : deltaSubdir(options.getMinimumWriteId(), + options.getMaximumWriteId()); } else { subdir = options.isWritingDeleteDelta() ? - deleteDeltaSubdir(options.getMinimumTransactionId(), - options.getMaximumTransactionId(), + deleteDeltaSubdir(options.getMinimumWriteId(), + options.getMaximumWriteId(), options.getStatementId()) - : deltaSubdir(options.getMinimumTransactionId(), - options.getMaximumTransactionId(), + : deltaSubdir(options.getMinimumWriteId(), + options.getMaximumWriteId(), options.getStatementId()); } return createBucketFile(new Path(directory, subdir), options.getBucketId()); } /** - * Get the transaction id from a base directory name. + * Get the write id from a base directory name. * @param path the base directory name - * @return the maximum transaction id that is included + * @return the maximum write id that is included */ public static long parseBase(Path path) { String filename = path.getName(); @@ -306,8 +308,8 @@ public class AcidUtils { Integer.parseInt(filename.substring(0, filename.indexOf('_'))); result .setOldStyle(true) - .minimumTransactionId(0) - .maximumTransactionId(0) + .minimumWriteId(0) + .maximumWriteId(0) .bucket(bucket) .writingBase(!bucketFile.getParent().getName().startsWith(DELTA_PREFIX)); } @@ -318,8 +320,8 @@ public class AcidUtils { int copyNumber = Integer.parseInt(filename.substring(filename.lastIndexOf('_') + 1)); result .setOldStyle(true) - .minimumTransactionId(0) - .maximumTransactionId(0) + .minimumWriteId(0) + .maximumWriteId(0) .bucket(bucket) .copyNumber(copyNumber) .writingBase(!bucketFile.getParent().getName().startsWith(DELTA_PREFIX)); @@ -330,8 +332,8 @@ public class AcidUtils { if (bucketFile.getParent().getName().startsWith(BASE_PREFIX)) { result .setOldStyle(false) - .minimumTransactionId(0) - .maximumTransactionId(parseBase(bucketFile.getParent())) + .minimumWriteId(0) + .maximumWriteId(parseBase(bucketFile.getParent())) .bucket(bucket) .writingBase(true); } else if (bucketFile.getParent().getName().startsWith(DELTA_PREFIX)) { @@ -339,21 +341,21 @@ public class AcidUtils { bucketFile.getFileSystem(conf)); result .setOldStyle(false) - .minimumTransactionId(parsedDelta.minTransaction) - .maximumTransactionId(parsedDelta.maxTransaction) + .minimumWriteId(parsedDelta.minWriteId) + .maximumWriteId(parsedDelta.maxWriteId) .bucket(bucket); } else if (bucketFile.getParent().getName().startsWith(DELETE_DELTA_PREFIX)) { ParsedDelta parsedDelta = parsedDelta(bucketFile.getParent(), DELETE_DELTA_PREFIX, bucketFile.getFileSystem(conf)); result .setOldStyle(false) - .minimumTransactionId(parsedDelta.minTransaction) - .maximumTransactionId(parsedDelta.maxTransaction) + .minimumWriteId(parsedDelta.minWriteId) + .maximumWriteId(parsedDelta.maxWriteId) .bucket(bucket); } } else { - result.setOldStyle(true).bucket(-1).minimumTransactionId(0) - .maximumTransactionId(0); + result.setOldStyle(true).bucket(-1).minimumWriteId(0) + .maximumWriteId(0); } return result; } @@ -637,8 +639,8 @@ public class AcidUtils { * Immutable */ public static final class ParsedDelta implements Comparable<ParsedDelta> { - private final long minTransaction; - private final long maxTransaction; + private final long minWriteId; + private final long maxWriteId; private final FileStatus path; //-1 is for internal (getAcidState()) purposes and means the delta dir //had no statement ID @@ -655,8 +657,8 @@ public class AcidUtils { } private ParsedDelta(long min, long max, FileStatus path, int statementId, boolean isDeleteDelta, boolean isRawFormat) { - this.minTransaction = min; - this.maxTransaction = max; + this.minWriteId = min; + this.maxWriteId = max; this.path = path; this.statementId = statementId; this.isDeleteDelta = isDeleteDelta; @@ -664,12 +666,12 @@ public class AcidUtils { assert !isDeleteDelta || !isRawFormat : " deleteDelta should not be raw format"; } - public long getMinTransaction() { - return minTransaction; + public long getMinWriteId() { + return minWriteId; } - public long getMaxTransaction() { - return maxTransaction; + public long getMaxWriteId() { + return maxWriteId; } public Path getPath() { @@ -698,14 +700,14 @@ public class AcidUtils { */ @Override public int compareTo(ParsedDelta parsedDelta) { - if (minTransaction != parsedDelta.minTransaction) { - if (minTransaction < parsedDelta.minTransaction) { + if (minWriteId != parsedDelta.minWriteId) { + if (minWriteId < parsedDelta.minWriteId) { return -1; } else { return 1; } - } else if (maxTransaction != parsedDelta.maxTransaction) { - if (maxTransaction < parsedDelta.maxTransaction) { + } else if (maxWriteId != parsedDelta.maxWriteId) { + if (maxWriteId < parsedDelta.maxWriteId) { return 1; } else { return -1; @@ -753,14 +755,17 @@ public class AcidUtils { public static List<AcidInputFormat.DeltaMetaData> serializeDeltas(List<ParsedDelta> deltas) { List<AcidInputFormat.DeltaMetaData> result = new ArrayList<>(deltas.size()); AcidInputFormat.DeltaMetaData last = null; - for(ParsedDelta parsedDelta : deltas) { - if(last != null && last.getMinTxnId() == parsedDelta.getMinTransaction() && last.getMaxTxnId() == parsedDelta.getMaxTransaction()) { + for (ParsedDelta parsedDelta : deltas) { + if ((last != null) + && (last.getMinWriteId() == parsedDelta.getMinWriteId()) + && (last.getMaxWriteId() == parsedDelta.getMaxWriteId())) { last.getStmtIds().add(parsedDelta.getStatementId()); continue; } - last = new AcidInputFormat.DeltaMetaData(parsedDelta.getMinTransaction(), parsedDelta.getMaxTransaction(), new ArrayList<Integer>()); + last = new AcidInputFormat.DeltaMetaData(parsedDelta.getMinWriteId(), + parsedDelta.getMaxWriteId(), new ArrayList<Integer>()); result.add(last); - if(parsedDelta.statementId >= 0) { + if (parsedDelta.statementId >= 0) { last.getStmtIds().add(parsedDelta.getStatementId()); } } @@ -780,11 +785,11 @@ public class AcidUtils { List<Path> results = new ArrayList<Path>(deleteDeltas.size()); for(AcidInputFormat.DeltaMetaData dmd : deleteDeltas) { if(dmd.getStmtIds().isEmpty()) { - results.add(new Path(root, deleteDeltaSubdir(dmd.getMinTxnId(), dmd.getMaxTxnId()))); + results.add(new Path(root, deleteDeltaSubdir(dmd.getMinWriteId(), dmd.getMaxWriteId()))); continue; } for(Integer stmtId : dmd.getStmtIds()) { - results.add(new Path(root, deleteDeltaSubdir(dmd.getMinTxnId(), dmd.getMaxTxnId(), stmtId))); + results.add(new Path(root, deleteDeltaSubdir(dmd.getMinWriteId(), dmd.getMaxWriteId(), stmtId))); } } return results.toArray(new Path[results.size()]); @@ -802,8 +807,8 @@ public class AcidUtils { throws IOException { ParsedDelta p = parsedDelta(path.getPath(), deltaPrefix, fs); boolean isDeleteDelta = deltaPrefix.equals(DELETE_DELTA_PREFIX); - return new ParsedDelta(p.getMinTransaction(), - p.getMaxTransaction(), path, p.statementId, isDeleteDelta, p.isRawFormat()); + return new ParsedDelta(p.getMinWriteId(), + p.getMaxWriteId(), path, p.statementId, isDeleteDelta, p.isRawFormat()); } public static ParsedDelta parsedDelta(Path deltaDir, String deltaPrefix, FileSystem fs) @@ -856,16 +861,16 @@ public class AcidUtils { @VisibleForTesting public static Directory getAcidState(Path directory, Configuration conf, - ValidTxnList txnList + ValidWriteIdList writeIdList ) throws IOException { - return getAcidState(directory, conf, txnList, false, false); + return getAcidState(directory, conf, writeIdList, false, false); } /** State class for getChildState; cannot modify 2 things in a method. */ private static class TxnBase { private FileStatus status; - private long txn = 0; - private long oldestBaseTxnId = Long.MAX_VALUE; + private long writeId = 0; + private long oldestBaseWriteId = Long.MAX_VALUE; private Path oldestBase = null; } @@ -876,22 +881,22 @@ public class AcidUtils { * transaction id that we must exclude. * @param directory the partition directory to analyze * @param conf the configuration - * @param txnList the list of transactions that we are reading + * @param writeIdList the list of write ids that we are reading * @return the state of the directory * @throws IOException */ public static Directory getAcidState(Path directory, Configuration conf, - ValidTxnList txnList, + ValidWriteIdList writeIdList, boolean useFileIds, boolean ignoreEmptyFiles ) throws IOException { - return getAcidState(directory, conf, txnList, Ref.from(useFileIds), ignoreEmptyFiles, null); + return getAcidState(directory, conf, writeIdList, Ref.from(useFileIds), ignoreEmptyFiles, null); } public static Directory getAcidState(Path directory, Configuration conf, - ValidTxnList txnList, + ValidWriteIdList writeIdList, Ref<Boolean> useFileIds, boolean ignoreEmptyFiles, Map<String, String> tblproperties) throws IOException { @@ -921,13 +926,13 @@ public class AcidUtils { final List<HdfsFileStatusWithId> original = new ArrayList<>(); if (childrenWithId != null) { for (HdfsFileStatusWithId child : childrenWithId) { - getChildState(child.getFileStatus(), child, txnList, working, originalDirectories, original, + getChildState(child.getFileStatus(), child, writeIdList, working, originalDirectories, original, obsolete, bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties, fs); } } else { List<FileStatus> children = HdfsUtils.listLocatedStatus(fs, directory, hiddenFileFilter); for (FileStatus child : children) { - getChildState(child, null, txnList, working, originalDirectories, original, obsolete, + getChildState(child, null, writeIdList, working, originalDirectories, original, obsolete, bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties, fs); } } @@ -955,30 +960,30 @@ public class AcidUtils { Collections.sort(working); //so now, 'working' should be sorted like delta_5_20 delta_5_10 delta_11_20 delta_51_60 for example //and we want to end up with the best set containing all relevant data: delta_5_20 delta_51_60, - //subject to list of 'exceptions' in 'txnList' (not show in above example). - long current = bestBase.txn; + //subject to list of 'exceptions' in 'writeIdList' (not show in above example). + long current = bestBase.writeId; int lastStmtId = -1; ParsedDelta prev = null; for(ParsedDelta next: working) { - if (next.maxTransaction > current) { + if (next.maxWriteId > current) { // are any of the new transactions ones that we care about? - if (txnList.isTxnRangeValid(current+1, next.maxTransaction) != - ValidTxnList.RangeResponse.NONE) { + if (writeIdList.isWriteIdRangeValid(current+1, next.maxWriteId) != + ValidWriteIdList.RangeResponse.NONE) { deltas.add(next); - current = next.maxTransaction; + current = next.maxWriteId; lastStmtId = next.statementId; prev = next; } } - else if(next.maxTransaction == current && lastStmtId >= 0) { + else if(next.maxWriteId == current && lastStmtId >= 0) { //make sure to get all deltas within a single transaction; multi-statement txn //generate multiple delta files with the same txnId range - //of course, if maxTransaction has already been minor compacted, all per statement deltas are obsolete + //of course, if maxWriteId has already been minor compacted, all per statement deltas are obsolete deltas.add(next); prev = next; } - else if (prev != null && next.maxTransaction == prev.maxTransaction - && next.minTransaction == prev.minTransaction + else if (prev != null && next.maxWriteId == prev.maxWriteId + && next.minWriteId == prev.minWriteId && next.statementId == prev.statementId) { // The 'next' parsedDelta may have everything equal to the 'prev' parsedDelta, except // the path. This may happen when we have split update and we have two types of delta @@ -1002,15 +1007,15 @@ public class AcidUtils { if(bestBase.oldestBase != null && bestBase.status == null) { /** * If here, it means there was a base_x (> 1 perhaps) but none were suitable for given - * {@link txnList}. Note that 'original' files are logically a base_Long.MIN_VALUE and thus + * {@link writeIdList}. Note that 'original' files are logically a base_Long.MIN_VALUE and thus * cannot have any data for an open txn. We could check {@link deltas} has files to cover * [1,n] w/o gaps but this would almost never happen...*/ - long[] exceptions = txnList.getInvalidTransactions(); - String minOpenTxn = exceptions != null && exceptions.length > 0 ? + long[] exceptions = writeIdList.getInvalidWriteIds(); + String minOpenWriteId = exceptions != null && exceptions.length > 0 ? Long.toString(exceptions[0]) : "x"; throw new IOException(ErrorMsg.ACID_NOT_ENOUGH_HISTORY.format( - Long.toString(txnList.getHighWatermark()), - minOpenTxn, bestBase.oldestBase.toString())); + Long.toString(writeIdList.getHighWatermark()), + minOpenWriteId, bestBase.oldestBase.toString())); } final Path base = bestBase.status == null ? null : bestBase.status.getPath(); @@ -1071,43 +1076,44 @@ public class AcidUtils { * causes anything written previously is ignored (hence the overwrite). In this case, base_x * is visible if txnid:x is committed for current reader. */ - private static boolean isValidBase(long baseTxnId, ValidTxnList txnList, Path baseDir, - FileSystem fs) throws IOException { - if(baseTxnId == Long.MIN_VALUE) { + private static boolean isValidBase(long baseWriteId, ValidWriteIdList writeIdList, Path baseDir, + FileSystem fs) throws IOException { + if(baseWriteId == Long.MIN_VALUE) { //such base is created by 1st compaction in case of non-acid to acid table conversion //By definition there are no open txns with id < 1. return true; } if(!MetaDataFile.isCompacted(baseDir, fs)) { //this is the IOW case - return txnList.isTxnValid(baseTxnId); + return writeIdList.isWriteIdValid(baseWriteId); } - return txnList.isValidBase(baseTxnId); + return writeIdList.isValidBase(baseWriteId); } + private static void getChildState(FileStatus child, HdfsFileStatusWithId childWithId, - ValidTxnList txnList, List<ParsedDelta> working, List<FileStatus> originalDirectories, + ValidWriteIdList writeIdList, List<ParsedDelta> working, List<FileStatus> originalDirectories, List<HdfsFileStatusWithId> original, List<FileStatus> obsolete, TxnBase bestBase, boolean ignoreEmptyFiles, List<FileStatus> aborted, Map<String, String> tblproperties, FileSystem fs) throws IOException { Path p = child.getPath(); String fn = p.getName(); if (fn.startsWith(BASE_PREFIX) && child.isDir()) { - long txn = parseBase(p); - if(bestBase.oldestBaseTxnId > txn) { + long writeId = parseBase(p); + if(bestBase.oldestBaseWriteId > writeId) { //keep track for error reporting bestBase.oldestBase = p; - bestBase.oldestBaseTxnId = txn; + bestBase.oldestBaseWriteId = writeId; } if (bestBase.status == null) { - if(isValidBase(txn, txnList, p, fs)) { + if(isValidBase(writeId, writeIdList, p, fs)) { bestBase.status = child; - bestBase.txn = txn; + bestBase.writeId = writeId; } - } else if (bestBase.txn < txn) { - if(isValidBase(txn, txnList, p, fs)) { + } else if (bestBase.writeId < writeId) { + if(isValidBase(writeId, writeIdList, p, fs)) { obsolete.add(bestBase.status); bestBase.status = child; - bestBase.txn = txn; + bestBase.writeId = writeId; } } else { obsolete.add(child); @@ -1118,12 +1124,12 @@ public class AcidUtils { (fn.startsWith(DELTA_PREFIX)) ? DELTA_PREFIX : DELETE_DELTA_PREFIX; ParsedDelta delta = parseDelta(child, deltaPrefix, fs); if (tblproperties != null && AcidUtils.isInsertOnlyTable(tblproperties) && - ValidTxnList.RangeResponse.ALL == txnList.isTxnRangeAborted(delta.minTransaction, delta.maxTransaction)) { + ValidWriteIdList.RangeResponse.ALL == writeIdList.isWriteIdRangeAborted(delta.minWriteId, delta.maxWriteId)) { aborted.add(child); } - if (txnList.isTxnRangeValid(delta.minTransaction, - delta.maxTransaction) != - ValidTxnList.RangeResponse.NONE) { + if (writeIdList.isWriteIdRangeValid(delta.minWriteId, + delta.maxWriteId) != + ValidWriteIdList.RangeResponse.NONE) { working.add(delta); } } else if (child.isDir()) { @@ -1391,7 +1397,7 @@ public class AcidUtils { * Returns the logical end of file for an acid data file. * * This relies on the fact that if delta_x_y has no committed transactions it wil be filtered out - * by {@link #getAcidState(Path, Configuration, ValidTxnList)} and so won't be read at all. + * by {@link #getAcidState(Path, Configuration, ValidWriteIdList)} and so won't be read at all. * @param file - data file to read/compute splits on */ public static long getLogicalLength(FileSystem fs, FileStatus file) throws IOException { @@ -1490,6 +1496,54 @@ public class AcidUtils { } /** + * Extract the ValidWriteIdList for the given table from the list of tables' ValidWriteIdList. + */ + public static ValidWriteIdList getTableValidWriteIdList(Configuration conf, String fullTableName) { + String txnString = conf.get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY); + ValidTxnWriteIdList validTxnList = new ValidTxnWriteIdList(txnString); + return validTxnList.getTableValidWriteIdList(fullTableName); + } + + /** + * Set the valid write id list for the current table scan. + */ + public static void setValidWriteIdList(Configuration conf, ValidWriteIdList validWriteIds) { + conf.set(ValidWriteIdList.VALID_WRITEIDS_KEY, validWriteIds.toString()); + LOG.debug("Setting ValidWriteIdList: " + validWriteIds.toString() + + " isAcidTable: " + HiveConf.getBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, false) + + " acidProperty: " + getAcidOperationalProperties(conf)); + } + + /** + * Set the valid write id list for the current table scan. + */ + public static void setValidWriteIdList(Configuration conf, TableScanDesc tsDesc) { + if (tsDesc.isTranscationalTable()) { + String dbName = tsDesc.getDatabaseName(); + String tableName = tsDesc.getTableName(); + ValidWriteIdList validWriteIdList = getTableValidWriteIdList(conf, + AcidUtils.getFullTableName(dbName, tableName)); + if (validWriteIdList != null) { + setValidWriteIdList(conf, validWriteIdList); + } else { + // Log error if the acid table is missing from the ValidWriteIdList conf + LOG.error("setValidWriteIdList on table: " + AcidUtils.getFullTableName(dbName, tableName) + + " isAcidTable: " + true + + " acidProperty: " + getAcidOperationalProperties(conf) + + " couldn't find the ValidWriteId list from ValidTxnWriteIdList: " + + conf.get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY)); + throw new IllegalStateException("ACID table: " + AcidUtils.getFullTableName(dbName, tableName) + + " is missing from the ValidWriteIdList config: " + + conf.get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY)); + } + } + } + + public static String getFullTableName(String dbName, String tableName) { + return dbName.toLowerCase() + "." + tableName.toLowerCase(); + } + + /** * General facility to place a metadta file into a dir created by acid/compactor write. * * Load Data commands against Acid tables write {@link AcidBaseFileType#ORIGINAL_BASE} type files
http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java index f0d4988..71498a1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java @@ -341,8 +341,8 @@ public final class HiveFileFormatUtils { .tableProperties(tableProp) .reporter(reporter) .writingBase(conf.getInsertOverwrite()) - .minimumTransactionId(conf.getTransactionId()) - .maximumTransactionId(conf.getTransactionId()) + .minimumWriteId(conf.getTableWriteId()) + .maximumWriteId(conf.getTableWriteId()) .bucket(bucket) .inspector(inspector) .recordIdColumn(rowIdColNum) http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index 912eb10..7987c4e 100755 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -36,8 +36,8 @@ import java.util.Map.Entry; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.common.StringInternUtils; -import org.apache.hadoop.hive.common.ValidReadTxnList; -import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.common.ValidTxnWriteIdList; +import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hive.common.util.Ref; import org.slf4j.Logger; @@ -478,12 +478,17 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable> InputFormat inputFormat, Class<? extends InputFormat> inputFormatClass, int splits, TableDesc table, List<InputSplit> result) throws IOException { - ValidTxnList validTxnList; + ValidWriteIdList validWriteIdList = AcidUtils.getTableValidWriteIdList(conf, table.getTableName()); + ValidWriteIdList validMmWriteIdList; if (AcidUtils.isInsertOnlyTable(table.getProperties())) { - String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY); - validTxnList = txnString == null ? new ValidReadTxnList() : new ValidReadTxnList(txnString); + if (validWriteIdList == null) { + throw new IOException("Insert-Only table: " + table.getTableName() + + " is missing from the ValidWriteIdList config: " + + conf.get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY)); + } + validMmWriteIdList = validWriteIdList; } else { - validTxnList = null; // for non-MM case + validMmWriteIdList = null; // for non-MM case } try { @@ -491,6 +496,15 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable> if (tableScan != null) { AcidUtils.setAcidOperationalProperties(conf, tableScan.getConf().isTranscationalTable(), tableScan.getConf().getAcidOperationalProperties()); + + if (tableScan.getConf().isTranscationalTable() && (validWriteIdList == null)) { + throw new IOException("Acid table: " + table.getTableName() + + " is missing from the ValidWriteIdList config: " + + conf.get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY)); + } + if (validWriteIdList != null) { + AcidUtils.setValidWriteIdList(conf, validWriteIdList); + } } } catch (HiveException e) { throw new IOException(e); @@ -500,7 +514,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable> pushFilters(conf, tableScan, this.mrwork); } - Path[] finalDirs = processPathsForMmRead(dirs, conf, validTxnList); + Path[] finalDirs = processPathsForMmRead(dirs, conf, validMmWriteIdList); if (finalDirs == null) { return; // No valid inputs. } @@ -531,13 +545,13 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable> } public static Path[] processPathsForMmRead(List<Path> dirs, JobConf conf, - ValidTxnList validTxnList) throws IOException { - if (validTxnList == null) { + ValidWriteIdList validWriteIdList) throws IOException { + if (validWriteIdList == null) { return dirs.toArray(new Path[dirs.size()]); } else { List<Path> finalPaths = new ArrayList<>(dirs.size()); for (Path dir : dirs) { - processForWriteIds(dir, conf, validTxnList, finalPaths); + processForWriteIds(dir, conf, validWriteIdList, finalPaths); } if (finalPaths.isEmpty()) { LOG.warn("No valid inputs found in " + dirs); @@ -548,7 +562,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable> } private static void processForWriteIds(Path dir, JobConf conf, - ValidTxnList validTxnList, List<Path> finalPaths) throws IOException { + ValidWriteIdList validWriteIdList, List<Path> finalPaths) throws IOException { FileSystem fs = dir.getFileSystem(conf); if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("Checking " + dir + " (root) for inputs"); @@ -574,10 +588,11 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable> } if (!file.isDirectory()) { Utilities.FILE_OP_LOGGER.warn("Ignoring a file not in MM directory " + path); - } else if (JavaUtils.extractTxnId(path) == null) { + } else if (JavaUtils.extractWriteId(path) == null) { subdirs.add(path); } else if (!hadAcidState) { - AcidUtils.Directory dirInfo = AcidUtils.getAcidState(currDir, conf, validTxnList, Ref.from(false), true, null); + AcidUtils.Directory dirInfo + = AcidUtils.getAcidState(currDir, conf, validWriteIdList, Ref.from(false), true, null); hadAcidState = true; // Find the base, created for IOW. @@ -890,6 +905,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable> AcidUtils.setAcidOperationalProperties(job, ts.getConf().isTranscationalTable(), ts.getConf().getAcidOperationalProperties()); + AcidUtils.setValidWriteIdList(job, ts.getConf()); } } } http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java b/ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java index 0c37203..1f673da 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/RecordIdentifier.java @@ -88,7 +88,7 @@ public class RecordIdentifier implements WritableComparable<RecordIdentifier> { Arrays.fill(struct, null); return; } - struct[Field.transactionId.ordinal()] = ri.getTransactionId(); + struct[Field.transactionId.ordinal()] = ri.getWriteId(); struct[Field.bucketId.ordinal()] = ri.getBucketProperty(); struct[Field.rowId.ordinal()] = ri.getRowId(); } @@ -101,20 +101,20 @@ public class RecordIdentifier implements WritableComparable<RecordIdentifier> { public RecordIdentifier() { } - public RecordIdentifier(long transactionId, int bucket, long rowId) { - this.transactionId = transactionId; + public RecordIdentifier(long writeId, int bucket, long rowId) { + this.transactionId = writeId; this.bucketId = bucket; this.rowId = rowId; } /** * Set the identifier. - * @param transactionId the transaction id + * @param writeId the write id * @param bucketId the bucket id * @param rowId the row id */ - public void setValues(long transactionId, int bucketId, long rowId) { - this.transactionId = transactionId; + public void setValues(long writeId, int bucketId, long rowId) { + this.transactionId = writeId; this.bucketId = bucketId; this.rowId = rowId; } @@ -134,10 +134,10 @@ public class RecordIdentifier implements WritableComparable<RecordIdentifier> { } /** - * What was the original transaction id for the last row? - * @return the transaction id + * What was the original write id for the last row? + * @return the write id */ - public long getTransactionId() { + public long getWriteId() { return transactionId; } @@ -223,7 +223,7 @@ public class RecordIdentifier implements WritableComparable<RecordIdentifier> { BucketCodec.determineVersion(bucketId); String s = "(" + codec.getVersion() + "." + codec.decodeWriterId(bucketId) + "." + codec.decodeStatementId(bucketId) + ")"; - return "{originalTxn: " + transactionId + ", " + bucketToString() + ", row: " + getRowId() +"}"; + return "{originalWriteId: " + transactionId + ", " + bucketToString() + ", row: " + getRowId() +"}"; } protected String bucketToString() { BucketCodec codec = http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/java/org/apache/hadoop/hive/ql/io/RecordUpdater.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/RecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/RecordUpdater.java index 36111f0..0aed172 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/RecordUpdater.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/RecordUpdater.java @@ -30,27 +30,26 @@ public interface RecordUpdater { /** * Insert a new record into the table. - * @param currentTransaction the transaction id of the current transaction. + * @param currentWriteId the table write id of the current transaction. * @param row the row of data to insert * @throws IOException */ - void insert(long currentTransaction, - Object row) throws IOException; + void insert(long currentWriteId, Object row) throws IOException; /** * Update an old record with a new set of values. - * @param currentTransaction the current transaction id + * @param currentWriteId the current write id * @param row the new values for the row * @throws IOException */ - void update(long currentTransaction, Object row) throws IOException; + void update(long currentWriteId, Object row) throws IOException; /** * Delete a row from the table. - * @param currentTransaction the current transaction id + * @param currentWriteId the current write id * @throws IOException */ - void delete(long currentTransaction, Object row) throws IOException; + void delete(long currentWriteId, Object row) throws IOException; /** * Flush the current set of rows to the underlying file system, so that http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 5e29070..e956485 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -49,8 +49,8 @@ import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.ValidReadTxnList; -import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.common.ValidReaderWriteIdList; +import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.Metastore; @@ -568,7 +568,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, private final boolean forceThreadpool; private final AtomicInteger cacheHitCounter = new AtomicInteger(0); private final AtomicInteger numFilesCounter = new AtomicInteger(0); - private final ValidTxnList transactionList; + private final ValidWriteIdList writeIdList; private SplitStrategyKind splitStrategyKind; private final SearchArgument sarg; private final AcidOperationalProperties acidOperationalProperties; @@ -650,8 +650,6 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, footerCache = useExternalCache ? metaCache : localCache; } } - String value = conf.get(ValidTxnList.VALID_TXNS_KEY); - transactionList = value == null ? new ValidReadTxnList() : new ValidReadTxnList(value); // Determine the transactional_properties of the table from the job conf stored in context. // The table properties are copied to job conf at HiveInputFormat::addSplitsForGroup(), @@ -662,6 +660,11 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, String transactionalProperties = conf.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES); this.acidOperationalProperties = isTableTransactional ? AcidOperationalProperties.parseString(transactionalProperties) : null; + + String value = conf.get(ValidWriteIdList.VALID_WRITEIDS_KEY); + writeIdList = value == null ? new ValidReaderWriteIdList() : new ValidReaderWriteIdList(value); + LOG.debug("Context:: Read ValidWriteIdList: " + writeIdList.toString() + + " isTransactionalTable: " + isTableTransactional); } @VisibleForTesting @@ -933,10 +936,6 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, } } - - - - private void runGetSplitsSync(List<Future<List<OrcSplit>>> splitFutures, List<OrcSplit> splits, UserGroupInformation ugi) throws IOException { UserGroupInformation tpUgi = ugi == null ? UserGroupInformation.getCurrentUser() : ugi; @@ -1093,8 +1092,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, /** * For plain or acid tables this is the root of the partition (or table if not partitioned). * For MM table this is delta/ or base/ dir. In MM case applying of the ValidTxnList that - * {@link AcidUtils#getAcidState(Path, Configuration, ValidTxnList)} normally does has already - * been done in {@link HiveInputFormat#processPathsForMmRead(List, JobConf, ValidTxnList)}. + * {@link AcidUtils#getAcidState(Path, Configuration, ValidWriteIdList)} normally does has already + * been done in {@link HiveInputFormat#processPathsForMmRead(List, JobConf, ValidWriteIdList)}. */ private final Path dir; private final Ref<Boolean> useFileIds; @@ -1134,7 +1133,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, private AcidDirInfo callInternal() throws IOException { //todo: shouldn't ignoreEmptyFiles be set based on ExecutionEngine? AcidUtils.Directory dirInfo = AcidUtils.getAcidState(dir, context.conf, - context.transactionList, useFileIds, true, null); + context.writeIdList, useFileIds, true, null); // find the base files (original or new style) List<AcidBaseFileInfo> baseFiles = new ArrayList<>(); if (dirInfo.getBaseDirectory() == null) { @@ -1173,8 +1172,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, AcidUtils.AcidBaseFileType.ORIGINAL_BASE : AcidUtils.AcidBaseFileType.ACID_SCHEMA; PathFilter bucketFilter = parsedDelta.isRawFormat() ? AcidUtils.originalBucketFilter : AcidUtils.bucketFileFilter; - if(parsedDelta.isRawFormat() && parsedDelta.getMinTransaction() != - parsedDelta.getMaxTransaction()) { + if (parsedDelta.isRawFormat() && parsedDelta.getMinWriteId() != parsedDelta.getMaxWriteId()) { //delta/ with files in raw format are a result of Load Data (as opposed to compaction //or streaming ingest so must have interval length == 1. throw new IllegalStateException("Delta in " + AcidUtils.AcidBaseFileType.ORIGINAL_BASE @@ -2009,12 +2007,15 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, final Reader.Options readOptions = OrcInputFormat.createOptionsForReader(conf); readOptions.range(split.getStart(), split.getLength()); - String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY); - ValidTxnList validTxnList = txnString == null ? new ValidReadTxnList() : - new ValidReadTxnList(txnString); + String txnString = conf.get(ValidWriteIdList.VALID_WRITEIDS_KEY); + ValidWriteIdList validWriteIdList + = (txnString == null) ? new ValidReaderWriteIdList() : new ValidReaderWriteIdList(txnString); + LOG.debug("getReader:: Read ValidWriteIdList: " + validWriteIdList.toString() + + " isTransactionalTable: " + HiveConf.getBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN)); + final OrcRawRecordMerger records = new OrcRawRecordMerger(conf, true, reader, split.isOriginal(), bucket, - validTxnList, readOptions, deltas, mergerOptions); + validWriteIdList, readOptions, deltas, mergerOptions); return new RowReader<OrcStruct>() { OrcStruct innerRecord = records.createValue(); @@ -2296,7 +2297,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, public RawReader<OrcStruct> getRawReader(Configuration conf, boolean collapseEvents, int bucket, - ValidTxnList validTxnList, + ValidWriteIdList validWriteIdList, Path baseDirectory, Path[] deltaDirectory ) throws IOException { @@ -2320,7 +2321,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, mergerOptions.rootPath(deltaDirectory[0].getParent()); } return new OrcRawRecordMerger(conf, collapseEvents, null, isOriginal, - bucket, validTxnList, new Reader.Options(), deltaDirectory, mergerOptions); + bucket, validWriteIdList, new Reader.Options(), deltaDirectory, mergerOptions); } /** http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java index f1f638d..57e005d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java @@ -204,20 +204,20 @@ public class OrcOutputFormat extends FileOutputFormat<NullWritable, OrcSerdeRow> } @Override - public void insert(long currentTransaction, Object row) throws IOException { - out.println("insert " + path + " currTxn: " + currentTransaction + + public void insert(long currentWriteId, Object row) throws IOException { + out.println("insert " + path + " currWriteId: " + currentWriteId + " obj: " + stringifyObject(row, inspector)); } @Override - public void update(long currentTransaction, Object row) throws IOException { - out.println("update " + path + " currTxn: " + currentTransaction + + public void update(long currentWriteId, Object row) throws IOException { + out.println("update " + path + " currWriteId: " + currentWriteId + " obj: " + stringifyObject(row, inspector)); } @Override - public void delete(long currentTransaction, Object row) throws IOException { - out.println("delete " + path + " currTxn: " + currentTransaction + " obj: " + row); + public void delete(long currentWriteId, Object row) throws IOException { + out.println("delete " + path + " currWriteId: " + currentWriteId + " obj: " + row); } @Override @@ -307,7 +307,7 @@ public class OrcOutputFormat extends FileOutputFormat<NullWritable, OrcSerdeRow> watcher.addKey( ((IntWritable) orc.getFieldValue(OrcRecordUpdater.OPERATION)).get(), ((LongWritable) - orc.getFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION)).get(), + orc.getFieldValue(OrcRecordUpdater.ORIGINAL_WRITEID)).get(), ((IntWritable) orc.getFieldValue(OrcRecordUpdater.BUCKET)).get(), ((LongWritable) orc.getFieldValue(OrcRecordUpdater.ROW_ID)).get()); writer.addRow(w); http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index 779da4f..27f28de 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -37,7 +37,7 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.ql.io.AcidInputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.RecordIdentifier; @@ -60,7 +60,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ private final ObjectInspector objectInspector; private final long offset; private final long length; - private final ValidTxnList validTxnList; + private final ValidWriteIdList validWriteIdList; private final int columns; private final ReaderKey prevKey = new ReaderKey(); // this is the key less than the lowest key we need to process @@ -70,15 +70,15 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ // an extra value so that we can return it while reading ahead private OrcStruct extraValue; /** - * A RecordIdentifier extended with the current transaction id. This is the - * key of our merge sort with the originalTransaction, bucket, and rowId - * ascending and the currentTransaction, statementId descending. This means that if the + * A RecordIdentifier extended with the current write id. This is the + * key of our merge sort with the originalWriteId, bucket, and rowId + * ascending and the currentWriteId, statementId descending. This means that if the * reader is collapsing events to just the last update, just the first * instance of each record is required. */ @VisibleForTesting public final static class ReaderKey extends RecordIdentifier{ - private long currentTransactionId; + private long currentWriteId; /** * This is the value from delta file name which may be different from value encode in * {@link RecordIdentifier#getBucketProperty()} in case of Update/Delete. @@ -86,54 +86,54 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ * or delete event. For Acid 2.0 + multi-stmt txn, it must be a delete event. * No 2 Insert events from can ever agree on {@link RecordIdentifier} */ - private int statementId;//sort on this descending, like currentTransactionId + private int statementId; //sort on this descending, like currentWriteId ReaderKey() { this(-1, -1, -1, -1, 0); } - ReaderKey(long originalTransaction, int bucket, long rowId, - long currentTransactionId) { - this(originalTransaction, bucket, rowId, currentTransactionId, 0); + ReaderKey(long originalWriteId, int bucket, long rowId, + long currentWriteId) { + this(originalWriteId, bucket, rowId, currentWriteId, 0); } /** * @param statementId - set this to 0 if N/A */ - public ReaderKey(long originalTransaction, int bucket, long rowId, - long currentTransactionId, int statementId) { - super(originalTransaction, bucket, rowId); - this.currentTransactionId = currentTransactionId; + public ReaderKey(long originalWriteId, int bucket, long rowId, + long currentWriteId, int statementId) { + super(originalWriteId, bucket, rowId); + this.currentWriteId = currentWriteId; this.statementId = statementId; } @Override public void set(RecordIdentifier other) { super.set(other); - currentTransactionId = ((ReaderKey) other).currentTransactionId; + currentWriteId = ((ReaderKey) other).currentWriteId; statementId = ((ReaderKey) other).statementId; } - public void setValues(long originalTransactionId, + public void setValues(long originalWriteId, int bucket, long rowId, - long currentTransactionId, + long currentWriteId, int statementId) { - setValues(originalTransactionId, bucket, rowId); - this.currentTransactionId = currentTransactionId; + setValues(originalWriteId, bucket, rowId); + this.currentWriteId = currentWriteId; this.statementId = statementId; } @Override public boolean equals(Object other) { return super.equals(other) && - currentTransactionId == ((ReaderKey) other).currentTransactionId + currentWriteId == ((ReaderKey) other).currentWriteId && statementId == ((ReaderKey) other).statementId//consistent with compareTo() ; } @Override public int hashCode() { int result = super.hashCode(); - result = 31 * result + (int)(currentTransactionId ^ (currentTransactionId >>> 32)); + result = 31 * result + (int)(currentWriteId ^ (currentWriteId >>> 32)); result = 31 * result + statementId; return result; } @@ -145,8 +145,8 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ if (sup == 0) { if (other.getClass() == ReaderKey.class) { ReaderKey oth = (ReaderKey) other; - if (currentTransactionId != oth.currentTransactionId) { - return currentTransactionId < oth.currentTransactionId ? +1 : -1; + if (currentWriteId != oth.currentWriteId) { + return currentWriteId < oth.currentWriteId ? +1 : -1; } if(statementId != oth.statementId) { return statementId < oth.statementId ? +1 : -1; @@ -162,15 +162,15 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ * This means 1 txn modified the same row more than once */ private boolean isSameRow(ReaderKey other) { - return compareRow(other) == 0 && currentTransactionId == other.currentTransactionId; + return compareRow(other) == 0 && currentWriteId == other.currentWriteId; } - long getCurrentTransactionId() { - return currentTransactionId; + long getCurrentWriteId() { + return currentWriteId; } /** - * Compare rows without considering the currentTransactionId. + * Compare rows without considering the currentWriteId. * @param other the value to compare to * @return -1, 0, +1 */ @@ -180,9 +180,9 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ @Override public String toString() { - return "{originalTxn: " + getTransactionId() + ", " + - bucketToString() + ", row: " + getRowId() + ", currentTxn: " + - currentTransactionId + ", statementId: "+ statementId + "}"; + return "{originalWriteId: " + getWriteId() + ", " + + bucketToString() + ", row: " + getRowId() + ", currentWriteId " + + currentWriteId + ", statementId: "+ statementId + "}"; } } interface ReaderPair { @@ -389,9 +389,9 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ IntWritable operation = new IntWritable(OrcRecordUpdater.INSERT_OPERATION); nextRecord().setFieldValue(OrcRecordUpdater.OPERATION, operation); - nextRecord().setFieldValue(OrcRecordUpdater.CURRENT_TRANSACTION, + nextRecord().setFieldValue(OrcRecordUpdater.CURRENT_WRITEID, new LongWritable(transactionId)); - nextRecord().setFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION, + nextRecord().setFieldValue(OrcRecordUpdater.ORIGINAL_WRITEID, new LongWritable(transactionId)); nextRecord().setFieldValue(OrcRecordUpdater.BUCKET, new IntWritable(bucketProperty)); @@ -403,11 +403,11 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ nextRecord = next; ((IntWritable) next.getFieldValue(OrcRecordUpdater.OPERATION)) .set(OrcRecordUpdater.INSERT_OPERATION); - ((LongWritable) next.getFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION)) + ((LongWritable) next.getFieldValue(OrcRecordUpdater.ORIGINAL_WRITEID)) .set(transactionId); ((IntWritable) next.getFieldValue(OrcRecordUpdater.BUCKET)) .set(bucketProperty); - ((LongWritable) next.getFieldValue(OrcRecordUpdater.CURRENT_TRANSACTION)) + ((LongWritable) next.getFieldValue(OrcRecordUpdater.CURRENT_WRITEID)) .set(transactionId); ((LongWritable) next.getFieldValue(OrcRecordUpdater.ROW_ID)) .set(nextRowId); @@ -445,7 +445,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ OriginalReaderPairToRead(ReaderKey key, Reader reader, int bucketId, final RecordIdentifier minKey, final RecordIdentifier maxKey, Reader.Options options, Options mergerOptions, Configuration conf, - ValidTxnList validTxnList, int statementId) throws IOException { + ValidWriteIdList validWriteIdList, int statementId) throws IOException { super(key, bucketId, conf, mergerOptions, statementId); this.reader = reader; assert !mergerOptions.isCompacting(); @@ -472,8 +472,8 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ * contents to be in {@link org.apache.hadoop.hive.ql.io.AcidUtils.Directory#getOriginalFiles()} */ //the split is from something other than the 1st file of the logical bucket - compute offset - AcidUtils.Directory directoryState = AcidUtils.getAcidState(mergerOptions.getRootPath(), - conf, validTxnList, false, true); + AcidUtils.Directory directoryState + = AcidUtils.getAcidState(mergerOptions.getRootPath(), conf, validWriteIdList, false, true); for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) { AcidOutputFormat.Options bucketOptions = AcidUtils.parseBaseOrDeltaBucketFilename(f.getFileStatus().getPath(), conf); @@ -577,7 +577,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ OriginalReaderPairToCompact(ReaderKey key, int bucketId, Reader.Options options, Options mergerOptions, Configuration conf, - ValidTxnList validTxnList, int statementId) throws IOException { + ValidWriteIdList validWriteIdList, int statementId) throws IOException { super(key, bucketId, conf, mergerOptions, statementId); assert mergerOptions.isCompacting() : "Should only be used for Compaction"; this.conf = conf; @@ -587,8 +587,8 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ //when compacting each split needs to process the whole logical bucket assert options.getOffset() == 0; assert options.getMaxOffset() == Long.MAX_VALUE; - AcidUtils.Directory directoryState = AcidUtils.getAcidState( - mergerOptions.getRootPath(), conf, validTxnList, false, true); + AcidUtils.Directory directoryState + = AcidUtils.getAcidState(mergerOptions.getRootPath(), conf, validWriteIdList, false, true); /** * Note that for reading base_x/ or delta_x_x/ with non-acid schema, * {@link Options#getRootPath()} is set to base_x/ or delta_x_x/ which causes all it's @@ -714,7 +714,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ boolean isTail = true; RecordIdentifier minKey = null; RecordIdentifier maxKey = null; - TransactionMetaData tfp = TransactionMetaData.findTransactionIDForSynthetcRowIDs( + TransactionMetaData tfp = TransactionMetaData.findWriteIDForSynthetcRowIDs( mergerOptions.getBucketPath(), mergerOptions.getRootPath(), conf); int bucketProperty = encodeBucketId(conf, bucket, tfp.statementId); /** @@ -939,13 +939,13 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ Reader reader, boolean isOriginal, int bucket, - ValidTxnList validTxnList, + ValidWriteIdList validWriteIdList, Reader.Options options, Path[] deltaDirectory, Options mergerOptions) throws IOException { this.collapse = collapseEvents; this.offset = options.getOffset(); this.length = options.getLength(); - this.validTxnList = validTxnList; + this.validWriteIdList = validWriteIdList; /** * @since Hive 3.0 * With split update (HIVE-14035) we have base/, delta/ and delete_delta/ - the latter only @@ -1028,7 +1028,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ AcidUtils.parseBase(mergerOptions.getBaseDir()), mergerOptions.getBaseDir()); } pair = new OriginalReaderPairToCompact(key, bucket, options, readerPairOptions, - conf, validTxnList, + conf, validWriteIdList, 0);//0 since base_x doesn't have a suffix (neither does pre acid write) } else { assert mergerOptions.getBucketPath() != null : " since this is not compaction: " @@ -1036,14 +1036,14 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ //if here it's a non-acid schema file - check if from before table was marked transactional //or in base_x/delta_x_x from Load Data Options readerPairOptions = mergerOptions; - TransactionMetaData tfp = TransactionMetaData.findTransactionIDForSynthetcRowIDs( + TransactionMetaData tfp = TransactionMetaData.findWriteIDForSynthetcRowIDs( mergerOptions.getBucketPath(), mergerOptions.getRootPath(), conf); - if(tfp.syntheticTransactionId > 0) { + if(tfp.syntheticWriteId > 0) { readerPairOptions = modifyForNonAcidSchemaRead(mergerOptions, - tfp.syntheticTransactionId, tfp.folder); + tfp.syntheticWriteId, tfp.folder); } pair = new OriginalReaderPairToRead(key, reader, bucket, keyInterval.getMinKey(), - keyInterval.getMaxKey(), options, readerPairOptions, conf, validTxnList, tfp.statementId); + keyInterval.getMaxKey(), options, readerPairOptions, conf, validWriteIdList, tfp.statementId); } } else { if(mergerOptions.isCompacting()) { @@ -1100,11 +1100,11 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ assert !deltaDir.isDeleteDelta() : delta.toString(); assert mergerOptions.isCompacting() : "during regular read anything which is not a" + " delete_delta is treated like base: " + delta; - Options rawCompactOptions = modifyForNonAcidSchemaRead(mergerOptions, - deltaDir.getMinTransaction(), delta); + Options rawCompactOptions = modifyForNonAcidSchemaRead(mergerOptions, deltaDir.getMinWriteId(), delta); + //this will also handle copy_N files if any ReaderPair deltaPair = new OriginalReaderPairToCompact(key, bucket, options, - rawCompactOptions, conf, validTxnList, deltaDir.getStatementId()); + rawCompactOptions, conf, validWriteIdList, deltaDir.getStatementId()); if (deltaPair.nextRecord() != null) { readers.put(key, deltaPair); } @@ -1170,24 +1170,24 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ * type files into a base_x/ or delta_x_x. The data in these are then assigned ROW_IDs at read * time and made permanent at compaction time. This is identical to how 'original' files (i.e. * those that existed in the table before it was converted to an Acid table) except that the - * transaction ID to use in the ROW_ID should be that of the transaction that ran the Load Data. + * write ID to use in the ROW_ID should be that of the transaction that ran the Load Data. */ static final class TransactionMetaData { - final long syntheticTransactionId; + final long syntheticWriteId; /** * folder which determines the transaction id to use in synthetic ROW_IDs */ final Path folder; final int statementId; - TransactionMetaData(long syntheticTransactionId, Path folder) { - this(syntheticTransactionId, folder, 0); + TransactionMetaData(long syntheticWriteId, Path folder) { + this(syntheticWriteId, folder, 0); } - TransactionMetaData(long syntheticTransactionId, Path folder, int statementId) { - this.syntheticTransactionId = syntheticTransactionId; + TransactionMetaData(long syntheticWriteId, Path folder, int statementId) { + this.syntheticWriteId = syntheticWriteId; this.folder = folder; this.statementId = statementId; } - static TransactionMetaData findTransactionIDForSynthetcRowIDs(Path splitPath, Path rootPath, + static TransactionMetaData findWriteIDForSynthetcRowIDs(Path splitPath, Path rootPath, Configuration conf) throws IOException { Path parent = splitPath.getParent(); if(rootPath.equals(parent)) { @@ -1205,10 +1205,10 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ else { AcidUtils.ParsedDelta pd = AcidUtils.parsedDelta(parent, AcidUtils.DELTA_PREFIX, parent.getFileSystem(conf)); - assert pd.getMinTransaction() == pd.getMaxTransaction() : + assert pd.getMinWriteId() == pd.getMaxWriteId() : "This a delta with raw non acid schema, must be result of single write, no compaction: " + splitPath; - return new TransactionMetaData(pd.getMinTransaction(), parent, pd.getStatementId()); + return new TransactionMetaData(pd.getMinWriteId(), parent, pd.getStatementId()); } } parent = parent.getParent(); @@ -1227,7 +1227,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ /** * This is done to read non-acid schema files ("original") located in base_x/ or delta_x_x/ which * happens as a result of Load Data statement. Setting {@code rootPath} to base_x/ or delta_x_x - * causes {@link AcidUtils#getAcidState(Path, Configuration, ValidTxnList)} in subsequent + * causes {@link AcidUtils#getAcidState(Path, Configuration, ValidWriteIdList)} in subsequent * {@link OriginalReaderPair} object to return the files in this dir * in {@link AcidUtils.Directory#getOriginalFiles()} * @return modified clone of {@code baseOptions} @@ -1350,8 +1350,8 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ } // if this transaction isn't ok, skip over it - if (!validTxnList.isTxnValid( - ((ReaderKey) recordIdentifier).getCurrentTransactionId())) { + if (!validWriteIdList.isWriteIdValid( + ((ReaderKey) recordIdentifier).getCurrentWriteId())) { continue; } http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java index b90ce6e..2e4db22 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java @@ -55,7 +55,7 @@ import org.slf4j.LoggerFactory; * A RecordUpdater where the files are stored as ORC. * A note on various record structures: the {@code row} coming in (as in {@link #insert(long, Object)} * for example), is a struct like <RecordIdentifier, f1, ... fn> but what is written to the file - * * is <op, otid, writerId, rowid, ctid, <f1, ... fn>> (see {@link #createEventSchema(ObjectInspector)}) + * * is <op, owid, writerId, rowid, cwid, <f1, ... fn>> (see {@link #createEventSchema(ObjectInspector)}) * So there are OIs here to make the translation. */ public class OrcRecordUpdater implements RecordUpdater { @@ -72,10 +72,10 @@ public class OrcRecordUpdater implements RecordUpdater { final static int DELETE_OPERATION = 2; //column indexes of corresponding data in storage layer final static int OPERATION = 0; - final static int ORIGINAL_TRANSACTION = 1; + final static int ORIGINAL_WRITEID = 1; final static int BUCKET = 2; final static int ROW_ID = 3; - final static int CURRENT_TRANSACTION = 4; + final static int CURRENT_WRITEID = 4; final static int ROW = 5; /** * total number of fields (above) @@ -100,8 +100,8 @@ public class OrcRecordUpdater implements RecordUpdater { private final FSDataOutputStream flushLengths; private final OrcStruct item; private final IntWritable operation = new IntWritable(); - private final LongWritable currentTransaction = new LongWritable(-1); - private final LongWritable originalTransaction = new LongWritable(-1); + private final LongWritable currentWriteId = new LongWritable(-1); + private final LongWritable originalWriteId = new LongWritable(-1); private final IntWritable bucket = new IntWritable(); private final LongWritable rowId = new LongWritable(); private long insertedRows = 0; @@ -112,12 +112,12 @@ public class OrcRecordUpdater implements RecordUpdater { private KeyIndexBuilder deleteEventIndexBuilder; private StructField recIdField = null; // field to look for the record identifier in private StructField rowIdField = null; // field inside recId to look for row id in - private StructField originalTxnField = null; // field inside recId to look for original txn in + private StructField originalWriteIdField = null; // field inside recId to look for original write id in private StructField bucketField = null; // field inside recId to look for bucket in private StructObjectInspector rowInspector; // OI for the original row private StructObjectInspector recIdInspector; // OI for the record identifier struct private LongObjectInspector rowIdInspector; // OI for the long row id inside the recordIdentifier - private LongObjectInspector origTxnInspector; // OI for the original txn inside the record + private LongObjectInspector origWriteIdInspector; // OI for the original write id inside the record // identifer private IntObjectInspector bucketInspector; @@ -126,11 +126,11 @@ public class OrcRecordUpdater implements RecordUpdater { } static long getCurrentTransaction(OrcStruct struct) { - return ((LongWritable) struct.getFieldValue(CURRENT_TRANSACTION)).get(); + return ((LongWritable) struct.getFieldValue(CURRENT_WRITEID)).get(); } static long getOriginalTransaction(OrcStruct struct) { - return ((LongWritable) struct.getFieldValue(ORIGINAL_TRANSACTION)).get(); + return ((LongWritable) struct.getFieldValue(ORIGINAL_WRITEID)).get(); } static int getBucket(OrcStruct struct) { @@ -184,15 +184,13 @@ public class OrcRecordUpdater implements RecordUpdater { fields.add(new OrcStruct.Field("operation", PrimitiveObjectInspectorFactory.writableIntObjectInspector, OPERATION)); fields.add(new OrcStruct.Field("originalTransaction", - PrimitiveObjectInspectorFactory.writableLongObjectInspector, - ORIGINAL_TRANSACTION)); + PrimitiveObjectInspectorFactory.writableLongObjectInspector, ORIGINAL_WRITEID)); fields.add(new OrcStruct.Field("bucket", PrimitiveObjectInspectorFactory.writableIntObjectInspector, BUCKET)); fields.add(new OrcStruct.Field("rowId", PrimitiveObjectInspectorFactory.writableLongObjectInspector, ROW_ID)); fields.add(new OrcStruct.Field("currentTransaction", - PrimitiveObjectInspectorFactory.writableLongObjectInspector, - CURRENT_TRANSACTION)); + PrimitiveObjectInspectorFactory.writableLongObjectInspector, CURRENT_WRITEID)); fields.add(new OrcStruct.Field("row", rowInspector, ROW)); return new OrcStruct.OrcStructInspector(fields); } @@ -246,7 +244,7 @@ public class OrcRecordUpdater implements RecordUpdater { } } } - if (options.getMinimumTransactionId() != options.getMaximumTransactionId() + if (options.getMinimumWriteId() != options.getMaximumWriteId() && !options.isWritingBase()){ //throw if file already exists as that should never happen flushLengths = fs.create(OrcAcidUtils.getSideFile(this.path), false, 8, @@ -316,8 +314,8 @@ public class OrcRecordUpdater implements RecordUpdater { options.getRecordIdColumn()))); item = new OrcStruct(FIELDS); item.setFieldValue(OPERATION, operation); - item.setFieldValue(CURRENT_TRANSACTION, currentTransaction); - item.setFieldValue(ORIGINAL_TRANSACTION, originalTransaction); + item.setFieldValue(CURRENT_WRITEID, currentWriteId); + item.setFieldValue(ORIGINAL_WRITEID, originalWriteId); item.setFieldValue(BUCKET, bucket); item.setFieldValue(ROW_ID, rowId); } @@ -342,9 +340,9 @@ public class OrcRecordUpdater implements RecordUpdater { List<? extends StructField> fields = ((StructObjectInspector) recIdField.getFieldObjectInspector()).getAllStructFieldRefs(); // Go by position, not field name, as field names aren't guaranteed. The order of fields - // in RecordIdentifier is transactionId, bucketId, rowId - originalTxnField = fields.get(0); - origTxnInspector = (LongObjectInspector)originalTxnField.getFieldObjectInspector(); + // in RecordIdentifier is writeId, bucketId, rowId + originalWriteIdField = fields.get(0); + origWriteIdInspector = (LongObjectInspector)originalWriteIdField.getFieldObjectInspector(); bucketField = fields.get(1); bucketInspector = (IntObjectInspector) bucketField.getFieldObjectInspector(); rowIdField = fields.get(2); @@ -361,27 +359,27 @@ public class OrcRecordUpdater implements RecordUpdater { * thus even for unbucketed tables, the N in bucket_N file name matches writerId/bucketId even for * late split */ - private void addSimpleEvent(int operation, long currentTransaction, long rowId, Object row) + private void addSimpleEvent(int operation, long currentWriteId, long rowId, Object row) throws IOException { this.operation.set(operation); - this.currentTransaction.set(currentTransaction); + this.currentWriteId.set(currentWriteId); Integer currentBucket = null; - // If this is an insert, originalTransaction should be set to this transaction. If not, + // If this is an insert, originalWriteId should be set to this transaction. If not, // it will be reset by the following if anyway. - long originalTransaction = currentTransaction; + long originalWriteId = currentWriteId; if (operation == DELETE_OPERATION || operation == UPDATE_OPERATION) { Object rowIdValue = rowInspector.getStructFieldData(row, recIdField); - originalTransaction = origTxnInspector.get( - recIdInspector.getStructFieldData(rowIdValue, originalTxnField)); + originalWriteId = origWriteIdInspector.get( + recIdInspector.getStructFieldData(rowIdValue, originalWriteIdField)); rowId = rowIdInspector.get(recIdInspector.getStructFieldData(rowIdValue, rowIdField)); currentBucket = setBucket(bucketInspector.get( recIdInspector.getStructFieldData(rowIdValue, bucketField)), operation); } this.rowId.set(rowId); - this.originalTransaction.set(originalTransaction); + this.originalWriteId.set(originalWriteId); item.setFieldValue(OrcRecordUpdater.OPERATION, new IntWritable(operation)); item.setFieldValue(OrcRecordUpdater.ROW, (operation == DELETE_OPERATION ? null : row)); - indexBuilder.addKey(operation, originalTransaction, bucket.get(), rowId); + indexBuilder.addKey(operation, originalWriteId, bucket.get(), rowId); if (writer == null) { writer = OrcFile.createWriter(path, writerOptions); } @@ -389,18 +387,18 @@ public class OrcRecordUpdater implements RecordUpdater { restoreBucket(currentBucket, operation); } - private void addSplitUpdateEvent(int operation, long currentTransaction, long rowId, Object row) + private void addSplitUpdateEvent(int operation, long currentWriteId, long rowId, Object row) throws IOException { if (operation == INSERT_OPERATION) { // Just insert the record in the usual way, i.e., default to the simple behavior. - addSimpleEvent(operation, currentTransaction, rowId, row); + addSimpleEvent(operation, currentWriteId, rowId, row); return; } this.operation.set(operation); - this.currentTransaction.set(currentTransaction); + this.currentWriteId.set(currentWriteId); Object rowValue = rowInspector.getStructFieldData(row, recIdField); - long originalTransaction = origTxnInspector.get( - recIdInspector.getStructFieldData(rowValue, originalTxnField)); + long originalWriteId = origWriteIdInspector.get( + recIdInspector.getStructFieldData(rowValue, originalWriteIdField)); rowId = rowIdInspector.get( recIdInspector.getStructFieldData(rowValue, rowIdField)); Integer currentBucket = null; @@ -423,54 +421,54 @@ public class OrcRecordUpdater implements RecordUpdater { // A delete/update generates a delete event for the original row. this.rowId.set(rowId); - this.originalTransaction.set(originalTransaction); + this.originalWriteId.set(originalWriteId); item.setFieldValue(OrcRecordUpdater.OPERATION, new IntWritable(DELETE_OPERATION)); item.setFieldValue(OrcRecordUpdater.ROW, null); // ROW is null for delete events. - deleteEventIndexBuilder.addKey(DELETE_OPERATION, originalTransaction, bucket.get(), rowId); + deleteEventIndexBuilder.addKey(DELETE_OPERATION, originalWriteId, bucket.get(), rowId); deleteEventWriter.addRow(item); restoreBucket(currentBucket, operation); } if (operation == UPDATE_OPERATION) { // A new row is also inserted in the usual delta file for an update event. - addSimpleEvent(INSERT_OPERATION, currentTransaction, insertedRows++, row); + addSimpleEvent(INSERT_OPERATION, currentWriteId, insertedRows++, row); } } @Override - public void insert(long currentTransaction, Object row) throws IOException { - if (this.currentTransaction.get() != currentTransaction) { + public void insert(long currentWriteId, Object row) throws IOException { + if (this.currentWriteId.get() != currentWriteId) { insertedRows = 0; } if (acidOperationalProperties.isSplitUpdate()) { - addSplitUpdateEvent(INSERT_OPERATION, currentTransaction, insertedRows++, row); + addSplitUpdateEvent(INSERT_OPERATION, currentWriteId, insertedRows++, row); } else { - addSimpleEvent(INSERT_OPERATION, currentTransaction, insertedRows++, row); + addSimpleEvent(INSERT_OPERATION, currentWriteId, insertedRows++, row); } rowCountDelta++; } @Override - public void update(long currentTransaction, Object row) throws IOException { - if (this.currentTransaction.get() != currentTransaction) { + public void update(long currentWriteId, Object row) throws IOException { + if (this.currentWriteId.get() != currentWriteId) { insertedRows = 0; } if (acidOperationalProperties.isSplitUpdate()) { - addSplitUpdateEvent(UPDATE_OPERATION, currentTransaction, -1L, row); + addSplitUpdateEvent(UPDATE_OPERATION, currentWriteId, -1L, row); } else { - addSimpleEvent(UPDATE_OPERATION, currentTransaction, -1L, row); + addSimpleEvent(UPDATE_OPERATION, currentWriteId, -1L, row); } } @Override - public void delete(long currentTransaction, Object row) throws IOException { - if (this.currentTransaction.get() != currentTransaction) { + public void delete(long currentWriteId, Object row) throws IOException { + if (this.currentWriteId.get() != currentWriteId) { insertedRows = 0; } if (acidOperationalProperties.isSplitUpdate()) { - addSplitUpdateEvent(DELETE_OPERATION, currentTransaction, -1L, row); + addSplitUpdateEvent(DELETE_OPERATION, currentWriteId, -1L, row); } else { - addSimpleEvent(DELETE_OPERATION, currentTransaction, -1L, row); + addSimpleEvent(DELETE_OPERATION, currentWriteId, -1L, row); } rowCountDelta--; }