Murtadha Hubail has submitted this change and it was merged. Change subject: ASTERIXDB-969: Redesigned recovery analysis phase to spill to disk ......................................................................
ASTERIXDB-969: Redesigned recovery analysis phase to spill to disk Change-Id: Ide2b346c2ad498d7595e71bae890362c2143d301 Reviewed-on: https://asterix-gerrit.ics.uci.edu/458 Tested-by: Jenkins <[email protected]> Reviewed-by: Young-Seok Kim <[email protected]> --- M asterix-common/src/main/java/org/apache/asterix/common/transactions/DatasetId.java M asterix-common/src/main/java/org/apache/asterix/common/transactions/JobId.java M asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java M asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java 4 files changed, 589 insertions(+), 338 deletions(-) Approvals: Young-Seok Kim: Looks good to me, approved Jenkins: Verified diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/DatasetId.java b/asterix-common/src/main/java/org/apache/asterix/common/transactions/DatasetId.java index 79d45e4..4bcbada 100644 --- a/asterix-common/src/main/java/org/apache/asterix/common/transactions/DatasetId.java +++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/DatasetId.java @@ -25,6 +25,11 @@ * */ private static final long serialVersionUID = 1L; + /** + * The number of bytes used to represent {@link DatasetId} value. + */ + public static final int BYTES = Integer.BYTES; + int id; public DatasetId(int id) { diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/JobId.java b/asterix-common/src/main/java/org/apache/asterix/common/transactions/JobId.java index 3c80e67..046487a 100644 --- a/asterix-common/src/main/java/org/apache/asterix/common/transactions/JobId.java +++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/JobId.java @@ -22,6 +22,10 @@ public class JobId implements Serializable { private static final long serialVersionUID = 1L; + /** + * The number of bytes used to represent {@link JobId} value. + */ + public static final int BYTES = Integer.BYTES; private int id; diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java b/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java index 60e3097..a510e51 100644 --- a/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java +++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java @@ -101,21 +101,19 @@ checksumGen = new CRC32(); } - private final static int TYPE_LEN = Byte.SIZE/Byte.SIZE; - private final static int JID_LEN = Integer.SIZE / Byte.SIZE; - private final static int DSID_LEN = Integer.SIZE / Byte.SIZE; - private final static int PKHASH_LEN = Integer.SIZE / Byte.SIZE; - private final static int PKSZ_LEN = Integer.SIZE / Byte.SIZE; + private final static int TYPE_LEN = Byte.SIZE / Byte.SIZE; + public final static int PKHASH_LEN = Integer.SIZE / Byte.SIZE; + public final static int PKSZ_LEN = Integer.SIZE / Byte.SIZE; private final static int PRVLSN_LEN = Long.SIZE / Byte.SIZE; private final static int RSID_LEN = Long.SIZE / Byte.SIZE; private final static int LOGRCD_SZ_LEN = Integer.SIZE / Byte.SIZE; private final static int FLDCNT_LEN = Integer.SIZE / Byte.SIZE; - private final static int NEWOP_LEN = Byte.SIZE/Byte.SIZE; + private final static int NEWOP_LEN = Byte.SIZE / Byte.SIZE; private final static int NEWVALSZ_LEN = Integer.SIZE / Byte.SIZE; private final static int CHKSUM_LEN = Long.SIZE / Byte.SIZE; - private final static int ALL_RECORD_HEADER_LEN = TYPE_LEN + JID_LEN; - private final static int ENTITYCOMMIT_UPDATE_HEADER_LEN = DSID_LEN + PKHASH_LEN + PKSZ_LEN; + private final static int ALL_RECORD_HEADER_LEN = TYPE_LEN + JobId.BYTES; + private final static int ENTITYCOMMIT_UPDATE_HEADER_LEN = DatasetId.BYTES + PKHASH_LEN + PKSZ_LEN; private final static int UPDATE_LSN_HEADER = PRVLSN_LEN + RSID_LEN + LOGRCD_SZ_LEN; private final static int UPDATE_BODY_HEADER = FLDCNT_LEN + NEWOP_LEN + NEWVALSZ_LEN; @@ -142,11 +140,11 @@ buffer.putInt(newValueSize); writeTuple(buffer, newValue, newValueSize); } - + if (logType == LogType.FLUSH) { buffer.putInt(datasetId); } - + checksum = generateChecksum(buffer, beginOffset, logSize - CHKSUM_LEN); buffer.putLong(checksum); } @@ -174,20 +172,19 @@ public RECORD_STATUS readLogRecord(ByteBuffer buffer) { int beginOffset = buffer.position(); //first we need the logtype and Job ID, if the buffer isn't that big, then no dice. - if(buffer.remaining() < ALL_RECORD_HEADER_LEN) { + if (buffer.remaining() < ALL_RECORD_HEADER_LEN) { buffer.position(beginOffset); return RECORD_STATUS.TRUNCATED; } logType = buffer.get(); jobId = buffer.getInt(); - if(logType != LogType.FLUSH) - { + if (logType != LogType.FLUSH) { if (logType == LogType.JOB_COMMIT || logType == LogType.ABORT) { datasetId = -1; PKHashValue = -1; } else { //attempt to read in the dsid, PK hash and PK length - if(buffer.remaining() < ENTITYCOMMIT_UPDATE_HEADER_LEN){ + if (buffer.remaining() < ENTITYCOMMIT_UPDATE_HEADER_LEN) { buffer.position(beginOffset); return RECORD_STATUS.TRUNCATED; } @@ -195,7 +192,7 @@ PKHashValue = buffer.getInt(); PKValueSize = buffer.getInt(); //attempt to read in the PK - if(buffer.remaining() < PKValueSize){ + if (buffer.remaining() < PKValueSize) { buffer.position(beginOffset); return RECORD_STATUS.TRUNCATED; } @@ -206,7 +203,7 @@ } if (logType == LogType.UPDATE) { //attempt to read in the previous LSN, log size, new value size, and new record type - if(buffer.remaining() <UPDATE_LSN_HEADER + UPDATE_BODY_HEADER){ + if (buffer.remaining() < UPDATE_LSN_HEADER + UPDATE_BODY_HEADER) { buffer.position(beginOffset); return RECORD_STATUS.TRUNCATED; } @@ -216,7 +213,7 @@ fieldCnt = buffer.getInt(); newOp = buffer.get(); newValueSize = buffer.getInt(); - if(buffer.remaining() < newValueSize){ + if (buffer.remaining() < newValueSize) { buffer.position(beginOffset); return RECORD_STATUS.TRUNCATED; } @@ -224,10 +221,9 @@ } else { computeAndSetLogSize(); } - } - else{ + } else { computeAndSetLogSize(); - if(buffer.remaining() < DSID_LEN){ + if (buffer.remaining() < DatasetId.BYTES) { buffer.position(beginOffset); return RECORD_STATUS.TRUNCATED; } @@ -235,7 +231,7 @@ resourceId = 0l; } //atempt to read checksum - if(buffer.remaining() < CHKSUM_LEN){ + if (buffer.remaining() < CHKSUM_LEN) { buffer.position(beginOffset); return RECORD_STATUS.TRUNCATED; } @@ -274,7 +270,7 @@ this.PKHashValue = -1; computeAndSetLogSize(); } - + public void formFlushLogRecord(int datasetId, PrimaryIndexOperationTracker opTracker) { this.logType = LogType.FLUSH; this.jobId = -1; diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java index 82ad32d..fec04e4 100644 --- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java +++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java @@ -24,9 +24,15 @@ import java.io.FileOutputStream; import java.io.FilenameFilter; import java.io.IOException; +import java.io.InputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -45,17 +51,21 @@ import org.apache.asterix.common.context.DatasetLifecycleManager; import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback; +import org.apache.asterix.common.transactions.DatasetId; import org.apache.asterix.common.transactions.IAsterixAppRuntimeContextProvider; import org.apache.asterix.common.transactions.ILogReader; import org.apache.asterix.common.transactions.ILogRecord; import org.apache.asterix.common.transactions.IRecoveryManager; import org.apache.asterix.common.transactions.ITransactionContext; +import org.apache.asterix.common.transactions.JobId; +import org.apache.asterix.common.transactions.LogRecord; import org.apache.asterix.common.transactions.LogType; import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; import org.apache.asterix.transaction.management.service.logging.LogManager; import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants; import org.apache.asterix.transaction.management.service.transaction.TransactionManager; import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem; +import org.apache.commons.io.FileUtils; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.lifecycle.ILifeCycleComponent; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; @@ -83,6 +93,11 @@ private final LogManager logMgr; private final int checkpointHistory; private final long SHARP_CHECKPOINT_LSN = -1; + private final static String RECOVERY_FILES_DIR_NAME = "recovery_temp"; + private static final long MEGABYTE = 1024L * 1024L; + private Map<Integer, JobEntityCommits> jobId2WinnerEntitiesMap = null; + private static final long MAX_CACHED_ENTITY_COMMITS_PER_JOB_SIZE = 4 * MEGABYTE; //2MB; + /** * A file at a known location that contains the LSN of the last log record * traversed doing a successful checkpoint. @@ -105,8 +120,7 @@ * not supported, yet. */ public SystemState getSystemState() throws ACIDException { - - //#. read checkpoint file + //read checkpoint file CheckpointObject checkpointObject = null; try { checkpointObject = readCheckpoint(); @@ -143,6 +157,8 @@ } public void startRecovery(boolean synchronous) throws IOException, ACIDException { + //delete any recovery files from previous failed recovery attempts + deleteRecoveryTemporaryFiles(); int updateLogCount = 0; int entityCommitLogCount = 0; @@ -152,18 +168,13 @@ int jobId = -1; state = SystemState.RECOVERING; - - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("[RecoveryMgr] starting recovery ..."); - } + LOGGER.log(Level.INFO, "[RecoveryMgr] starting recovery ..."); Set<Integer> winnerJobSet = new HashSet<Integer>(); - Map<Integer, Set<TxnId>> jobId2WinnerEntitiesMap = new HashMap<Integer, Set<TxnId>>(); - //winnerEntity is used to add pairs, <committed TxnId, the most recent commit Lsn of the TxnId> - Set<TxnId> winnerEntitySet = null; - TxnId tempKeyTxnId = new TxnId(-1, -1, -1, null, -1, false); - TxnId winnerEntity = null; + jobId2WinnerEntitiesMap = new HashMap<>(); + TxnId tempKeyTxnId = new TxnId(-1, -1, -1, null, -1, false); + JobEntityCommits jobEntityWinners = null; //#. read checkpoint file and set lowWaterMark where anaylsis and redo start long readableSmallestLSN = logMgr.getReadableSmallestLSN(); CheckpointObject checkpointObject = readCheckpoint(); @@ -177,215 +188,224 @@ // [ analysis phase ] // - collect all committed Lsn //------------------------------------------------------------------------- - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("[RecoveryMgr] in analysis phase"); - } + LOGGER.log(Level.INFO, "[RecoveryMgr] in analysis phase"); //#. set log reader to the lowWaterMarkLsn ILogReader logReader = logMgr.getLogReader(true); - logReader.initializeScan(lowWaterMarkLSN); - ILogRecord logRecord = logReader.next(); - while (logRecord != null) { - if (IS_DEBUG_MODE) { - LOGGER.info(logRecord.getLogRecordForDisplay()); - } - //update max jobId - if (logRecord.getJobId() > maxJobId) { - maxJobId = logRecord.getJobId(); - } - switch (logRecord.getLogType()) { - case LogType.UPDATE: - updateLogCount++; - break; - case LogType.JOB_COMMIT: - winnerJobSet.add(Integer.valueOf(logRecord.getJobId())); - jobId2WinnerEntitiesMap.remove(Integer.valueOf(logRecord.getJobId())); - jobCommitLogCount++; - break; - case LogType.ENTITY_COMMIT: - jobId = logRecord.getJobId(); - winnerEntity = new TxnId(jobId, logRecord.getDatasetId(), logRecord.getPKHashValue(), - logRecord.getPKValue(), logRecord.getPKValueSize(), true); - if (!jobId2WinnerEntitiesMap.containsKey(Integer.valueOf(jobId))) { - winnerEntitySet = new HashSet<TxnId>(); - jobId2WinnerEntitiesMap.put(Integer.valueOf(jobId), winnerEntitySet); - } else { - winnerEntitySet = jobId2WinnerEntitiesMap.get(Integer.valueOf(jobId)); - } - winnerEntitySet.add(winnerEntity); - entityCommitLogCount++; - break; - case LogType.ABORT: - abortLogCount++; - break; - case LogType.FLUSH: - break; - default: - throw new ACIDException("Unsupported LogType: " + logRecord.getLogType()); - } + ILogRecord logRecord = null; + try { + logReader.initializeScan(lowWaterMarkLSN); logRecord = logReader.next(); - } - - //------------------------------------------------------------------------- - // [ redo phase ] - // - redo if - // 1) The TxnId is committed && --> guarantee durability - // 2) lsn > maxDiskLastLsn of the index --> guarantee idempotence - //------------------------------------------------------------------------- - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("[RecoveryMgr] in redo phase"); - } - long resourceId; - long maxDiskLastLsn; - long LSN = -1; - ILSMIndex index = null; - LocalResource localResource = null; - ILocalResourceMetadata localResourceMetadata = null; - Map<Long, Long> resourceId2MaxLSNMap = new HashMap<Long, Long>(); - boolean foundWinner = false; - - //#. get indexLifeCycleManager - IAsterixAppRuntimeContextProvider appRuntimeContext = txnSubsystem.getAsterixAppRuntimeContextProvider(); - IIndexLifecycleManager indexLifecycleManager = appRuntimeContext.getIndexLifecycleManager(); - ILocalResourceRepository localResourceRepository = appRuntimeContext.getLocalResourceRepository(); - - Map<Long, LocalResource> resourcesMap = ((PersistentLocalResourceRepository) localResourceRepository) - .loadAndGetAllResources(); - //#. set log reader to the lowWaterMarkLsn again. - logReader.initializeScan(lowWaterMarkLSN); - logRecord = logReader.next(); - while (logRecord != null) { - if (IS_DEBUG_MODE) { - LOGGER.info(logRecord.getLogRecordForDisplay()); - } - LSN = logRecord.getLSN(); - jobId = logRecord.getJobId(); - foundWinner = false; - switch (logRecord.getLogType()) { - case LogType.UPDATE: - if (winnerJobSet.contains(Integer.valueOf(jobId))) { - foundWinner = true; - } else if (jobId2WinnerEntitiesMap.containsKey(Integer.valueOf(jobId))) { - winnerEntitySet = jobId2WinnerEntitiesMap.get(Integer.valueOf(jobId)); - tempKeyTxnId.setTxnId(jobId, logRecord.getDatasetId(), logRecord.getPKHashValue(), - logRecord.getPKValue(), logRecord.getPKValueSize()); - if (winnerEntitySet.contains(tempKeyTxnId)) { - foundWinner = true; + while (logRecord != null) { + if (IS_DEBUG_MODE) { + LOGGER.info(logRecord.getLogRecordForDisplay()); + } + //update max jobId + if (logRecord.getJobId() > maxJobId) { + maxJobId = logRecord.getJobId(); + } + switch (logRecord.getLogType()) { + case LogType.UPDATE: + updateLogCount++; + break; + case LogType.JOB_COMMIT: + jobId = logRecord.getJobId(); + winnerJobSet.add(jobId); + if (jobId2WinnerEntitiesMap.containsKey(jobId)) { + jobEntityWinners = jobId2WinnerEntitiesMap.get(jobId); + //to delete any spilled files as well + jobEntityWinners.clear(); + jobId2WinnerEntitiesMap.remove(jobId); } - } - if (foundWinner) { - resourceId = logRecord.getResourceId(); - localResource = resourcesMap.get(resourceId); - - /******************************************************************* - * [Notice] - * -> Issue - * Delete index may cause a problem during redo. - * The index operation to be redone couldn't be redone because the corresponding index - * may not exist in NC due to the possible index drop DDL operation. - * -> Approach - * Avoid the problem during redo. - * More specifically, the problem will be detected when the localResource of - * the corresponding index is retrieved, which will end up with 'null'. - * If null is returned, then just go and process the next - * log record. - *******************************************************************/ - if (localResource == null) { - logRecord = logReader.next(); - continue; - } - /*******************************************************************/ - - //get index instance from IndexLifeCycleManager - //if index is not registered into IndexLifeCycleManager, - //create the index using LocalMetadata stored in LocalResourceRepository - index = (ILSMIndex) indexLifecycleManager.getIndex(localResource.getResourceName()); - if (index == null) { - //#. create index instance and register to indexLifeCycleManager - localResourceMetadata = (ILocalResourceMetadata) localResource.getResourceObject(); - index = localResourceMetadata.createIndexInstance(appRuntimeContext, - localResource.getResourceName(), localResource.getPartition()); - indexLifecycleManager.register(localResource.getResourceName(), index); - indexLifecycleManager.open(localResource.getResourceName()); - - //#. get maxDiskLastLSN - ILSMIndex lsmIndex = (ILSMIndex) index; - maxDiskLastLsn = ((AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback()) - .getComponentLSN(lsmIndex.getImmutableComponents()); - - //#. set resourceId and maxDiskLastLSN to the map - resourceId2MaxLSNMap.put(Long.valueOf(resourceId), Long.valueOf(maxDiskLastLsn)); + jobCommitLogCount++; + break; + case LogType.ENTITY_COMMIT: + jobId = logRecord.getJobId(); + if (!jobId2WinnerEntitiesMap.containsKey(jobId)) { + jobEntityWinners = new JobEntityCommits(jobId); + if (needToFreeMemory()) { + //if we don't have enough memory for one more job, we will force all jobs to spill their cached entities to disk. + //This could happen only when we have many jobs with small number of records and none of them have job commit. + freeJobsCachedEntities(jobId); + } + jobId2WinnerEntitiesMap.put(jobId, jobEntityWinners); } else { - maxDiskLastLsn = resourceId2MaxLSNMap.get(Long.valueOf(resourceId)); + jobEntityWinners = jobId2WinnerEntitiesMap.get(jobId); } - - if (LSN > maxDiskLastLsn) { - redo(logRecord); - redoCount++; - } - } - break; - - case LogType.JOB_COMMIT: - case LogType.ENTITY_COMMIT: - case LogType.ABORT: - case LogType.FLUSH: - - //do nothing - break; - - default: - throw new ACIDException("Unsupported LogType: " + logRecord.getLogType()); + jobEntityWinners.add(logRecord); + entityCommitLogCount++; + break; + case LogType.ABORT: + abortLogCount++; + break; + case LogType.FLUSH: + break; + default: + throw new ACIDException("Unsupported LogType: " + logRecord.getLogType()); + } + logRecord = logReader.next(); } + + //prepare winners for search after analysis is done to flush anything remaining in memory to disk. + for (JobEntityCommits winners : jobId2WinnerEntitiesMap.values()) { + winners.prepareForSearch(); + } + //------------------------------------------------------------------------- + // [ redo phase ] + // - redo if + // 1) The TxnId is committed && --> guarantee durability + // 2) lsn > maxDiskLastLsn of the index --> guarantee idempotence + //------------------------------------------------------------------------- + LOGGER.info("[RecoveryMgr] in redo phase"); + + long resourceId; + long maxDiskLastLsn; + long LSN = -1; + ILSMIndex index = null; + LocalResource localResource = null; + ILocalResourceMetadata localResourceMetadata = null; + Map<Long, Long> resourceId2MaxLSNMap = new HashMap<Long, Long>(); + boolean foundWinner = false; + + IAsterixAppRuntimeContextProvider appRuntimeContext = txnSubsystem.getAsterixAppRuntimeContextProvider(); + IIndexLifecycleManager indexLifecycleManager = appRuntimeContext.getIndexLifecycleManager(); + ILocalResourceRepository localResourceRepository = appRuntimeContext.getLocalResourceRepository(); + Map<Long, LocalResource> resourcesMap = ((PersistentLocalResourceRepository) localResourceRepository) + .loadAndGetAllResources(); + + //set log reader to the lowWaterMarkLsn again. + logReader.initializeScan(lowWaterMarkLSN); logRecord = logReader.next(); - } + while (logRecord != null) { + if (IS_DEBUG_MODE) { + LOGGER.info(logRecord.getLogRecordForDisplay()); + } + LSN = logRecord.getLSN(); + jobId = logRecord.getJobId(); + foundWinner = false; + switch (logRecord.getLogType()) { + case LogType.UPDATE: + if (winnerJobSet.contains(jobId)) { + foundWinner = true; + } else if (jobId2WinnerEntitiesMap.containsKey(jobId)) { + jobEntityWinners = jobId2WinnerEntitiesMap.get(jobId); + tempKeyTxnId.setTxnId(jobId, logRecord.getDatasetId(), logRecord.getPKHashValue(), + logRecord.getPKValue(), logRecord.getPKValueSize()); + if (jobEntityWinners.containsEntityCommitForTxnId(LSN, tempKeyTxnId)) { + foundWinner = true; + } + } + if (foundWinner) { + resourceId = logRecord.getResourceId(); + localResource = resourcesMap.get(resourceId); + /******************************************************************* + * [Notice] + * -> Issue + * Delete index may cause a problem during redo. + * The index operation to be redone couldn't be redone because the corresponding index + * may not exist in NC due to the possible index drop DDL operation. + * -> Approach + * Avoid the problem during redo. + * More specifically, the problem will be detected when the localResource of + * the corresponding index is retrieved, which will end up with 'null'. + * If null is returned, then just go and process the next + * log record. + *******************************************************************/ + if (localResource == null) { + logRecord = logReader.next(); + continue; + } + /*******************************************************************/ - //close all indexes - Set<Long> resourceIdList = resourceId2MaxLSNMap.keySet(); - for (long r : resourceIdList) { - indexLifecycleManager.close(resourcesMap.get(r).getResourceName()); - } + //get index instance from IndexLifeCycleManager + //if index is not registered into IndexLifeCycleManager, + //create the index using LocalMetadata stored in LocalResourceRepository + index = (ILSMIndex) indexLifecycleManager.getIndex(localResource.getResourceName()); + if (index == null) { + //#. create index instance and register to indexLifeCycleManager + localResourceMetadata = (ILocalResourceMetadata) localResource.getResourceObject(); + index = localResourceMetadata.createIndexInstance(appRuntimeContext, + localResource.getResourceName(), localResource.getPartition()); + indexLifecycleManager.register(localResource.getResourceName(), index); + indexLifecycleManager.open(localResource.getResourceName()); - logReader.close(); + //#. get maxDiskLastLSN + ILSMIndex lsmIndex = (ILSMIndex) index; + maxDiskLastLsn = ((AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback()) + .getComponentLSN(lsmIndex.getImmutableComponents()); - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("[RecoveryMgr] recovery is completed."); - LOGGER.info("[RecoveryMgr's recovery log count] update/entityCommit/jobCommit/abort/redo = " - + updateLogCount + "/" + entityCommitLogCount + "/" + jobCommitLogCount + "/" + abortLogCount + "/" - + redoCount); + //#. set resourceId and maxDiskLastLSN to the map + resourceId2MaxLSNMap.put(resourceId, maxDiskLastLsn); + } else { + maxDiskLastLsn = resourceId2MaxLSNMap.get(resourceId); + } + + if (LSN > maxDiskLastLsn) { + redo(logRecord); + redoCount++; + } + } + break; + case LogType.JOB_COMMIT: + case LogType.ENTITY_COMMIT: + case LogType.ABORT: + case LogType.FLUSH: + //do nothing + break; + default: + throw new ACIDException("Unsupported LogType: " + logRecord.getLogType()); + } + logRecord = logReader.next(); + } + + //close all indexes + Set<Long> resourceIdList = resourceId2MaxLSNMap.keySet(); + for (long r : resourceIdList) { + indexLifecycleManager.close(resourcesMap.get(r).getResourceName()); + } + + if (LOGGER.isLoggable(Level.INFO)) { + LOGGER.info("[RecoveryMgr] recovery is completed."); + LOGGER.info("[RecoveryMgr's recovery log count] update/entityCommit/jobCommit/abort/redo = " + + updateLogCount + "/" + entityCommitLogCount + "/" + jobCommitLogCount + "/" + abortLogCount + + "/" + redoCount); + } + } finally { + logReader.close(); + //delete any recovery files after completing recovery + deleteRecoveryTemporaryFiles(); } + } + + private static boolean needToFreeMemory() { + return Runtime.getRuntime().freeMemory() < MAX_CACHED_ENTITY_COMMITS_PER_JOB_SIZE; } @Override public synchronized long checkpoint(boolean isSharpCheckpoint, long nonSharpCheckpointTargetLSN) throws ACIDException, HyracksDataException { - long minMCTFirstLSN; boolean nonSharpCheckpointSucceeded = false; - if (isSharpCheckpoint && LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Starting sharp checkpoint ... "); + if (isSharpCheckpoint) { + LOGGER.log(Level.INFO, "Starting sharp checkpoint ... "); } TransactionManager txnMgr = (TransactionManager) txnSubsystem.getTransactionManager(); String logDir = logMgr.getLogManagerProperties().getLogDir(); - //#. get the filename of the previous checkpoint files which are about to be deleted - // right after the new checkpoint file is written. + //get the filename of the previous checkpoint files which are about to be deleted + //right after the new checkpoint file is written. File[] prevCheckpointFiles = getPreviousCheckpointFiles(); DatasetLifecycleManager datasetLifecycleManager = (DatasetLifecycleManager) txnSubsystem .getAsterixAppRuntimeContextProvider().getIndexLifecycleManager(); - //#. flush all in-memory components if it is the sharp checkpoint + //flush all in-memory components if it is the sharp checkpoint if (isSharpCheckpoint) { - datasetLifecycleManager.flushAllDatasets(); - minMCTFirstLSN = SHARP_CHECKPOINT_LSN; } else { - minMCTFirstLSN = getMinFirstLSN(); - if (minMCTFirstLSN >= nonSharpCheckpointTargetLSN) { nonSharpCheckpointSucceeded = true; } else { @@ -400,7 +420,7 @@ FileOutputStream fos = null; ObjectOutputStream oosToFos = null; try { - String fileName = getFileName(logDir, Long.toString(checkpointObject.getTimeStamp())); + String fileName = getCheckpointFileName(logDir, Long.toString(checkpointObject.getTimeStamp())); fos = new FileOutputStream(fileName); oosToFos = new ObjectOutputStream(fos); oosToFos.writeObject(checkpointObject); @@ -460,11 +480,8 @@ long firstLSN; //the min first lsn can only be the current append or smaller long minFirstLSN = logMgr.getAppendLSN(); - if (openIndexList.size() > 0) { - for (IIndex index : openIndexList) { - AbstractLSMIOOperationCallback ioCallback = (AbstractLSMIOOperationCallback) ((ILSMIndex) index) .getIOOperationCallback(); if (!((AbstractLSMIndex) index).isCurrentMutableComponentEmpty() || ioCallback.hasPendingFlush()) { @@ -473,23 +490,19 @@ } } } - return minFirstLSN; } private CheckpointObject readCheckpoint() throws ACIDException, FileNotFoundException { - CheckpointObject checkpointObject = null; - //#. read all checkpointObjects from the existing checkpoint files + //read all checkpointObjects from the existing checkpoint files File[] prevCheckpointFiles = getPreviousCheckpointFiles(); - if (prevCheckpointFiles == null || prevCheckpointFiles.length == 0) { throw new FileNotFoundException("Checkpoint file is not found"); } List<CheckpointObject> checkpointObjectList = new ArrayList<CheckpointObject>(); - for (File file : prevCheckpointFiles) { FileInputStream fis = null; ObjectInputStream oisFromFis = null; @@ -528,7 +541,6 @@ private File[] getPreviousCheckpointFiles() { String logDir = ((LogManager) txnSubsystem.getLogManager()).getLogManagerProperties().getLogDir(); - File parentDir = new File(logDir); FilenameFilter filter = new FilenameFilter() { @@ -542,18 +554,59 @@ } }; - File[] prevCheckpointFiles = parentDir.listFiles(filter); - - return prevCheckpointFiles; + return parentDir.listFiles(filter); } - private String getFileName(String baseDir, String suffix) { - + private String getCheckpointFileName(String baseDir, String suffix) { if (!baseDir.endsWith(System.getProperty("file.separator"))) { baseDir += System.getProperty("file.separator"); } - return baseDir + CHECKPOINT_FILENAME_PREFIX + suffix; + } + + private File createJobRecoveryFile(int jobId, String fileName) throws IOException { + String recoveryDirPath = getRecoveryDirPath(); + Path JobRecoveryFolder = Paths.get(recoveryDirPath + File.separator + jobId); + if (!Files.exists(JobRecoveryFolder)) { + Files.createDirectories(JobRecoveryFolder); + } + + File jobRecoveryFile = new File(JobRecoveryFolder.toString() + File.separator + fileName); + if (!jobRecoveryFile.exists()) { + jobRecoveryFile.createNewFile(); + } else { + throw new IOException("File: " + fileName + " for job id(" + jobId + ") already exists"); + } + + return jobRecoveryFile; + } + + private void deleteRecoveryTemporaryFiles() throws IOException { + String recoveryDirPath = getRecoveryDirPath(); + Path recoveryFolderPath = Paths.get(recoveryDirPath); + if (Files.exists(recoveryFolderPath)) { + FileUtils.deleteDirectory(recoveryFolderPath.toFile()); + } + } + + private String getRecoveryDirPath() { + String logDir = logMgr.getLogManagerProperties().getLogDir(); + if (!logDir.endsWith(File.separator)) { + logDir += File.separator; + } + + return logDir + RECOVERY_FILES_DIR_NAME; + } + + private void freeJobsCachedEntities(int requestingJobId) throws IOException { + if (jobId2WinnerEntitiesMap != null) { + for (Entry<Integer, JobEntityCommits> jobEntityCommits : jobId2WinnerEntitiesMap.entrySet()) { + //if the job is not the requester, free its memory + if (jobEntityCommits.getKey() != requestingJobId) { + jobEntityCommits.getValue().spillToDiskAndfreeMemory(); + } + } + } } /** @@ -563,136 +616,121 @@ */ @Override public void rollbackTransaction(ITransactionContext txnContext) throws ACIDException { - Map<TxnId, List<Long>> loserTxnTable = new HashMap<TxnId, List<Long>>(); - TxnId tempKeyTxnId = new TxnId(-1, -1, -1, null, -1, false); - - int updateLogCount = 0; - int entityCommitLogCount = 0; - int jobId = -1; int abortedJobId = txnContext.getJobId().getId(); - long currentLSN = -1; - TxnId loserEntity = null; - // Obtain the first/last log record LSNs written by the Job long firstLSN = txnContext.getFirstLSN(); long lastLSN = txnContext.getLastLSN(); - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info(" rollbacking transaction log records from " + firstLSN + " to " + lastLSN); - } + LOGGER.log(Level.INFO, "rollbacking transaction log records from " + firstLSN + " to " + lastLSN); // check if the transaction actually wrote some logs. if (firstLSN == TransactionManagementConstants.LogManagerConstants.TERMINAL_LSN) { - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info(" no need to roll back as there were no operations by the transaction " - + txnContext.getJobId()); - } + LOGGER.log(Level.INFO, + "no need to roll back as there were no operations by the transaction " + txnContext.getJobId()); return; } // While reading log records from firstLsn to lastLsn, collect uncommitted txn's Lsns - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info(" collecting loser transaction's LSNs from " + firstLSN + " to " + lastLSN); - } + LOGGER.log(Level.INFO, "collecting loser transaction's LSNs from " + firstLSN + " to " + lastLSN); + + Map<TxnId, List<Long>> jobLoserEntity2LSNsMap = new HashMap<TxnId, List<Long>>(); + TxnId tempKeyTxnId = new TxnId(-1, -1, -1, null, -1, false); + int updateLogCount = 0; + int entityCommitLogCount = 0; + int logJobId = -1; + long currentLSN = -1; + TxnId loserEntity = null; List<Long> undoLSNSet = null; + ILogReader logReader = logMgr.getLogReader(false); - logReader.initializeScan(firstLSN); - ILogRecord logRecord = null; - - while (currentLSN < lastLSN) { - logRecord = logReader.next(); - if (logRecord == null) { - break; - } else { - if (IS_DEBUG_MODE) { - LOGGER.info(logRecord.getLogRecordForDisplay()); - } - currentLSN = logRecord.getLSN(); - } - jobId = logRecord.getJobId(); - if (jobId != abortedJobId) { - continue; - } - tempKeyTxnId.setTxnId(jobId, logRecord.getDatasetId(), logRecord.getPKHashValue(), logRecord.getPKValue(), - logRecord.getPKValueSize()); - switch (logRecord.getLogType()) { - case LogType.UPDATE: - undoLSNSet = loserTxnTable.get(tempKeyTxnId); - if (undoLSNSet == null) { - loserEntity = new TxnId(jobId, logRecord.getDatasetId(), logRecord.getPKHashValue(), - logRecord.getPKValue(), logRecord.getPKValueSize(), true); - undoLSNSet = new LinkedList<Long>(); - loserTxnTable.put(loserEntity, undoLSNSet); - } - undoLSNSet.add(Long.valueOf(currentLSN)); - updateLogCount++; - if (IS_DEBUG_MODE) { - LOGGER.info("" + Thread.currentThread().getId() + "======> update[" + currentLSN + "]:" - + tempKeyTxnId); - } - break; - - case LogType.JOB_COMMIT: - throw new ACIDException("Unexpected LogType(" + logRecord.getLogType() + ") during abort."); - - case LogType.ENTITY_COMMIT: - undoLSNSet = loserTxnTable.remove(tempKeyTxnId); - if (undoLSNSet == null) { - undoLSNSet = loserTxnTable.remove(tempKeyTxnId); - } - entityCommitLogCount++; - if (IS_DEBUG_MODE) { - LOGGER.info("" + Thread.currentThread().getId() + "======> entity_commit[" + currentLSN + "]" - + tempKeyTxnId); - } - break; - - case LogType.ABORT: - case LogType.FLUSH: - //ignore - break; - - default: - throw new ACIDException("Unsupported LogType: " + logRecord.getLogType()); - } - } - if (currentLSN != lastLSN) { - throw new ACIDException("LastLSN mismatch: lastLSN(" + lastLSN + ") vs currentLSN(" + currentLSN - + ") during abort( " + txnContext.getJobId() + ")"); - } - - //undo loserTxn's effect - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info(" undoing loser transaction's effect"); - } - - Iterator<Entry<TxnId, List<Long>>> iter = loserTxnTable.entrySet().iterator(); - int undoCount = 0; - while (iter.hasNext()) { - //TODO - //Sort the lsns in order to undo in one pass. - - Map.Entry<TxnId, List<Long>> loserTxn = (Map.Entry<TxnId, List<Long>>) iter.next(); - undoLSNSet = loserTxn.getValue(); - - for (long undoLSN : undoLSNSet) { - //here, all the log records are UPDATE type. So, we don't need to check the type again. - //read the corresponding log record to be undone. - logRecord = logReader.read(undoLSN); + try { + logReader.initializeScan(firstLSN); + ILogRecord logRecord = null; + while (currentLSN < lastLSN) { + logRecord = logReader.next(); if (logRecord == null) { - throw new ACIDException("IllegalState exception during abort( " + txnContext.getJobId() + ")"); + break; + } else { + currentLSN = logRecord.getLSN(); + if (IS_DEBUG_MODE) { + LOGGER.info(logRecord.getLogRecordForDisplay()); + } } - if (IS_DEBUG_MODE) { - LOGGER.info(logRecord.getLogRecordForDisplay()); + logJobId = logRecord.getJobId(); + if (logJobId != abortedJobId) { + continue; } - undo(logRecord); - undoCount++; + tempKeyTxnId.setTxnId(logJobId, logRecord.getDatasetId(), logRecord.getPKHashValue(), + logRecord.getPKValue(), logRecord.getPKValueSize()); + switch (logRecord.getLogType()) { + case LogType.UPDATE: + undoLSNSet = jobLoserEntity2LSNsMap.get(tempKeyTxnId); + if (undoLSNSet == null) { + loserEntity = new TxnId(logJobId, logRecord.getDatasetId(), logRecord.getPKHashValue(), + logRecord.getPKValue(), logRecord.getPKValueSize(), true); + undoLSNSet = new LinkedList<Long>(); + jobLoserEntity2LSNsMap.put(loserEntity, undoLSNSet); + } + undoLSNSet.add(currentLSN); + updateLogCount++; + if (IS_DEBUG_MODE) { + LOGGER.info(Thread.currentThread().getId() + "======> update[" + currentLSN + "]:" + + tempKeyTxnId); + } + break; + case LogType.ENTITY_COMMIT: + jobLoserEntity2LSNsMap.remove(tempKeyTxnId); + entityCommitLogCount++; + if (IS_DEBUG_MODE) { + LOGGER.info(Thread.currentThread().getId() + "======> entity_commit[" + currentLSN + "]" + + tempKeyTxnId); + } + break; + case LogType.JOB_COMMIT: + throw new ACIDException("Unexpected LogType(" + logRecord.getLogType() + ") during abort."); + case LogType.ABORT: + case LogType.FLUSH: + //ignore + break; + default: + throw new ACIDException("Unsupported LogType: " + logRecord.getLogType()); + } } - } - logReader.close(); - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info(" undone loser transaction's effect"); - LOGGER.info("[RecoveryManager's rollback log count] update/entityCommit/undo:" + updateLogCount + "/" - + entityCommitLogCount + "/" + undoCount); + if (currentLSN != lastLSN) { + throw new ACIDException("LastLSN mismatch: lastLSN(" + lastLSN + ") vs currentLSN(" + currentLSN + + ") during abort( " + txnContext.getJobId() + ")"); + } + + //undo loserTxn's effect + LOGGER.log(Level.INFO, "undoing loser transaction's effect"); + + //TODO sort loser entities by smallest LSN to undo in one pass. + Iterator<Entry<TxnId, List<Long>>> iter = jobLoserEntity2LSNsMap.entrySet().iterator(); + int undoCount = 0; + while (iter.hasNext()) { + Map.Entry<TxnId, List<Long>> loserEntity2LSNsMap = iter.next(); + undoLSNSet = loserEntity2LSNsMap.getValue(); + for (long undoLSN : undoLSNSet) { + //here, all the log records are UPDATE type. So, we don't need to check the type again. + //read the corresponding log record to be undone. + logRecord = logReader.read(undoLSN); + if (logRecord == null) { + throw new ACIDException("IllegalState exception during abort( " + txnContext.getJobId() + ")"); + } + if (IS_DEBUG_MODE) { + LOGGER.info(logRecord.getLogRecordForDisplay()); + } + undo(logRecord); + undoCount++; + } + } + + if (LOGGER.isLoggable(Level.INFO)) { + LOGGER.info("undone loser transaction's effect"); + LOGGER.info("[RecoveryManager's rollback log count] update/entityCommit/undo:" + updateLogCount + "/" + + entityCommitLogCount + "/" + undoCount); + } + } finally { + logReader.close(); } } @@ -746,6 +784,175 @@ throw new IllegalStateException("Failed to redo", e); } } + + private class JobEntityCommits { + private static final String PARTITION_FILE_NAME_SEPARATOR = "_"; + private final int jobId; + private final Set<TxnId> cachedEntityCommitTxns = new HashSet<TxnId>(); + private final List<File> jobEntitCommitOnDiskPartitionsFiles = new ArrayList<File>(); + //a flag indicating whether all the the commits for this jobs have been added. + private boolean preparedForSearch = false; + private TxnId winnerEntity = null; + private int currentPartitionSize = 0; + private long partitionMaxLSN = 0; + private String currentPartitonName; + + public JobEntityCommits(int jobId) { + this.jobId = jobId; + } + + public void add(ILogRecord logRecord) throws IOException { + if (preparedForSearch) { + throw new IOException("Cannot add new entity commits after preparing for search."); + } + winnerEntity = new TxnId(logRecord.getJobId(), logRecord.getDatasetId(), logRecord.getPKHashValue(), + logRecord.getPKValue(), logRecord.getPKValueSize(), true); + cachedEntityCommitTxns.add(winnerEntity); + //since log file is read sequentially, LSNs are always increasing + partitionMaxLSN = logRecord.getLSN(); + currentPartitionSize += winnerEntity.getCurrentSize(); + //if the memory budget for the current partition exceeded the limit, spill it to disk and free memory + if (currentPartitionSize >= MAX_CACHED_ENTITY_COMMITS_PER_JOB_SIZE) { + spillToDiskAndfreeMemory(); + } + } + + public void spillToDiskAndfreeMemory() throws IOException { + if (cachedEntityCommitTxns.size() > 0) { + if (!preparedForSearch) { + writeCurrentPartitionToDisk(); + } + cachedEntityCommitTxns.clear(); + partitionMaxLSN = 0; + currentPartitionSize = 0; + currentPartitonName = ""; + } + } + + /** + * Call this method when no more entity commits will be added to this job. + * + * @throws IOException + */ + public void prepareForSearch() throws IOException { + //if we have anything left in memory, we need to spill them to disk before searching other partitions. + //However, if we don't have anything on disk, we will search from memory only + if (jobEntitCommitOnDiskPartitionsFiles.size() > 0) { + spillToDiskAndfreeMemory(); + } else { + //set the name of the current in memory partition to the current partition + currentPartitonName = getPartitionName(partitionMaxLSN); + } + preparedForSearch = true; + } + + public boolean containsEntityCommitForTxnId(long logLSN, TxnId txnId) throws IOException { + //if we don't have any partitions on disk, search only from memory + if (jobEntitCommitOnDiskPartitionsFiles.size() == 0) { + return cachedEntityCommitTxns.contains(txnId); + } else { + //get candidate partitions from disk + ArrayList<File> candidatePartitions = getCandidiatePartitions(logLSN); + for (File partition : candidatePartitions) { + if (serachPartition(partition, txnId)) { + return true; + } + } + } + return false; + } + + /** + * @param logLSN + * @return partitions that have a max LSN > logLSN + */ + public ArrayList<File> getCandidiatePartitions(long logLSN) { + ArrayList<File> candidiatePartitions = new ArrayList<File>(); + + for (File partition : jobEntitCommitOnDiskPartitionsFiles) { + String partitionName = partition.getName(); + //entity commit log must come after the update log, therefore, consider only partitions with max LSN > logLSN + if (getPartitionMaxLSNFromName(partitionName) > logLSN) { + candidiatePartitions.add(partition); + } + } + + return candidiatePartitions; + } + + public void clear() { + cachedEntityCommitTxns.clear(); + for (File partition : jobEntitCommitOnDiskPartitionsFiles) { + partition.delete(); + } + jobEntitCommitOnDiskPartitionsFiles.clear(); + } + + private boolean serachPartition(File partition, TxnId txnId) throws IOException { + //load partition from disk if it is not already in memory + if (!partition.getName().equals(currentPartitonName)) { + loadPartitionToMemory(partition, cachedEntityCommitTxns); + currentPartitonName = partition.getName(); + } + return cachedEntityCommitTxns.contains(txnId); + } + + private String getPartitionName(long maxLSN) { + return jobId + PARTITION_FILE_NAME_SEPARATOR + maxLSN; + } + + private long getPartitionMaxLSNFromName(String partitionName) { + return Long.valueOf(partitionName.substring(partitionName.indexOf(PARTITION_FILE_NAME_SEPARATOR) + 1)); + } + + private void writeCurrentPartitionToDisk() throws IOException { + //if we don't have enough memory to allocate for this partition, we will ask recovery manager to free memory + if (needToFreeMemory()) { + freeJobsCachedEntities(jobId); + } + //allocate a buffer that can hold the current partition + ByteBuffer buffer = ByteBuffer.allocate(currentPartitionSize); + for (Iterator<TxnId> iterator = cachedEntityCommitTxns.iterator(); iterator.hasNext();) { + TxnId txnId = iterator.next(); + //serialize the object and remove it from memory + txnId.serialize(buffer); + iterator.remove(); + } + //name partition file based on job id and max lsn + File partitionFile = createJobRecoveryFile(jobId, getPartitionName(partitionMaxLSN)); + //write file to disk + try (FileOutputStream fileOutputstream = new FileOutputStream(partitionFile, false); + FileChannel fileChannel = fileOutputstream.getChannel()) { + buffer.flip(); + while (buffer.hasRemaining()) { + fileChannel.write(buffer); + } + } + jobEntitCommitOnDiskPartitionsFiles.add(partitionFile); + } + + private void loadPartitionToMemory(File partition, Set<TxnId> partitionTxn) throws IOException { + partitionTxn.clear(); + //if we don't have enough memory to a load partition, we will ask recovery manager to free memory + if (needToFreeMemory()) { + freeJobsCachedEntities(jobId); + } + ByteBuffer buffer = ByteBuffer.allocateDirect((int) partition.length()); + //load partition to memory + try (InputStream is = new FileInputStream(partition)) { + int readByte; + while ((readByte = is.read()) != -1) { + buffer.put((byte) readByte); + } + } + buffer.flip(); + TxnId temp = null; + while (buffer.remaining() != 0) { + temp = TxnId.deserialize(buffer); + partitionTxn.add(temp); + } + } + } } class TxnId { @@ -772,6 +979,9 @@ } } + public TxnId() { + } + private void readPKValueIntoByteArray(ITupleReference pkValue, int pkSize, byte[] byteArrayPKValue) { int readOffset = pkValue.getFieldStart(0); byte[] readBuffer = pkValue.getFieldData(0); @@ -786,7 +996,7 @@ this.pkHashValue = pkHashValue; this.tupleReferencePKValue = pkValue; this.pkSize = pkSize; - isByteArrayPKValue = false; + this.isByteArrayPKValue = false; } @Override @@ -856,4 +1066,40 @@ } return true; } -} \ No newline at end of file + + public void serialize(ByteBuffer buffer) throws IOException { + buffer.putInt(jobId); + buffer.putInt(datasetId); + buffer.putInt(pkHashValue); + buffer.putInt(pkSize); + buffer.put((byte) (isByteArrayPKValue ? 1 : 0)); + if (isByteArrayPKValue) { + buffer.put(byteArrayPKValue); + } + } + + public static TxnId deserialize(ByteBuffer buffer) { + TxnId txnId = new TxnId(); + txnId.jobId = buffer.getInt(); + txnId.datasetId = buffer.getInt(); + txnId.pkHashValue = buffer.getInt(); + txnId.pkSize = buffer.getInt(); + txnId.isByteArrayPKValue = (buffer.get() == 1); + if (txnId.isByteArrayPKValue) { + byte[] byteArrayPKValue = new byte[txnId.pkSize]; + buffer.get(byteArrayPKValue); + txnId.byteArrayPKValue = byteArrayPKValue; + } + return txnId; + } + + public int getCurrentSize() { + //job id, dataset id, pkHashValue, arraySize, isByteArrayPKValue + int size = JobId.BYTES + DatasetId.BYTES + LogRecord.PKHASH_LEN + LogRecord.PKSZ_LEN + Byte.BYTES; + //byte arraySize + if (isByteArrayPKValue && byteArrayPKValue != null) { + size += byteArrayPKValue.length; + } + return size; + } +} -- To view, visit https://asterix-gerrit.ics.uci.edu/458 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: Ide2b346c2ad498d7595e71bae890362c2143d301 Gerrit-PatchSet: 5 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Young-Seok Kim <[email protected]>
