Ian Maxon has submitted this change and it was merged. Change subject: ASTERIXDB-1045: Log analysis fixes ......................................................................
ASTERIXDB-1045: Log analysis fixes -Avoid using exceptions for control flow in LogRecord -Rename LogPage and ilk to LogBuffer -Busywait on read() to fill entire buffer for fillLogBuffer rather than failing -Distinguish between log truncation and checksum corruption TODOs: - Log IO and parsing still happen in lock-step. - Busywaiting for read to return something other than 0 is unfortunate Change-Id: I1658e938eb0f199f748407361ffee4833aac661c Reviewed-on: https://asterix-gerrit.ics.uci.edu/289 Tested-by: Jenkins <[email protected]> Reviewed-by: Young-Seok Kim <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> --- R asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBuffer.java M asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java M asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java M asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockManager.java R asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java R asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBufferTailReader.java M asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java M asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java 8 files changed, 229 insertions(+), 97 deletions(-) Approvals: Young-Seok Kim: Looks good to me, but someone else must approve Murtadha Hubail: Looks good to me, approved Jenkins: Verified diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogPage.java b/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBuffer.java similarity index 96% rename from asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogPage.java rename to asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBuffer.java index bd174b5..9e28cda 100644 --- a/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogPage.java +++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBuffer.java @@ -18,7 +18,7 @@ */ package org.apache.asterix.common.transactions; -public interface ILogPage { +public interface ILogBuffer { public void append(ILogRecord logRecord, long appendLsn); diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java b/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java index 90595e3..16c51fe 100644 --- a/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java +++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java @@ -28,8 +28,15 @@ public static final int ENTITY_COMMIT_LOG_BASE_SIZE = 25; public static final int UPDATE_LOG_BASE_SIZE = 54; public static final int FLUSH_LOG_SIZE = 17; - - public boolean readLogRecord(ByteBuffer buffer); + + + public enum RECORD_STATUS{ + TRUNCATED, + BAD_CHKSUM, + OK + } + + public LogRecord.RECORD_STATUS readLogRecord(ByteBuffer buffer); public void writeLogRecord(ByteBuffer buffer); diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java b/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java index c0b71e6..60e3097 100644 --- a/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java +++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java @@ -83,7 +83,6 @@ //------------- fields in a log record (end) --------------// private int PKFieldCnt; - private static final int CHECKSUM_SIZE = 8; private ITransactionContext txnCtx; private long LSN; private final AtomicBoolean isFlushed; @@ -101,6 +100,24 @@ readNewValue = (SimpleTupleReference) tupleWriter.createTupleReference(); checksumGen = new CRC32(); } + + private final static int TYPE_LEN = Byte.SIZE/Byte.SIZE; + private final static int JID_LEN = Integer.SIZE / Byte.SIZE; + private final static int DSID_LEN = Integer.SIZE / Byte.SIZE; + private final static int PKHASH_LEN = Integer.SIZE / Byte.SIZE; + private final static int PKSZ_LEN = Integer.SIZE / Byte.SIZE; + private final static int PRVLSN_LEN = Long.SIZE / Byte.SIZE; + private final static int RSID_LEN = Long.SIZE / Byte.SIZE; + private final static int LOGRCD_SZ_LEN = Integer.SIZE / Byte.SIZE; + private final static int FLDCNT_LEN = Integer.SIZE / Byte.SIZE; + private final static int NEWOP_LEN = Byte.SIZE/Byte.SIZE; + private final static int NEWVALSZ_LEN = Integer.SIZE / Byte.SIZE; + private final static int CHKSUM_LEN = Long.SIZE / Byte.SIZE; + + private final static int ALL_RECORD_HEADER_LEN = TYPE_LEN + JID_LEN; + private final static int ENTITYCOMMIT_UPDATE_HEADER_LEN = DSID_LEN + PKHASH_LEN + PKSZ_LEN; + private final static int UPDATE_LSN_HEADER = PRVLSN_LEN + RSID_LEN + LOGRCD_SZ_LEN; + private final static int UPDATE_BODY_HEADER = FLDCNT_LEN + NEWOP_LEN + NEWVALSZ_LEN; @Override public void writeLogRecord(ByteBuffer buffer) { @@ -130,7 +147,7 @@ buffer.putInt(datasetId); } - checksum = generateChecksum(buffer, beginOffset, logSize - CHECKSUM_SIZE); + checksum = generateChecksum(buffer, beginOffset, logSize - CHKSUM_LEN); buffer.putLong(checksum); } @@ -154,52 +171,79 @@ } @Override - public boolean readLogRecord(ByteBuffer buffer) { + public RECORD_STATUS readLogRecord(ByteBuffer buffer) { int beginOffset = buffer.position(); - try { - logType = buffer.get(); - jobId = buffer.getInt(); - if(logType != LogType.FLUSH) - { - if (logType == LogType.JOB_COMMIT || logType == LogType.ABORT) { - datasetId = -1; - PKHashValue = -1; - } else { - datasetId = buffer.getInt(); - PKHashValue = buffer.getInt(); - PKValueSize = buffer.getInt(); - if (PKValueSize <= 0) { - throw new IllegalStateException("Primary Key Size is less than or equal to 0"); - } - PKValue = readPKValue(buffer); - } - if (logType == LogType.UPDATE) { - prevLSN = buffer.getLong(); - resourceId = buffer.getLong(); - logSize = buffer.getInt(); - fieldCnt = buffer.getInt(); - newOp = buffer.get(); - newValueSize = buffer.getInt(); - newValue = readTuple(buffer, readNewValue, fieldCnt, newValueSize); - } else { - computeAndSetLogSize(); - } - } - else{ - computeAndSetLogSize(); - datasetId = buffer.getInt(); - resourceId = 0l; - } - - checksum = buffer.getLong(); - if (checksum != generateChecksum(buffer, beginOffset, logSize - CHECKSUM_SIZE)) { - throw new IllegalStateException(); - } - } catch (BufferUnderflowException e) { + //first we need the logtype and Job ID, if the buffer isn't that big, then no dice. + if(buffer.remaining() < ALL_RECORD_HEADER_LEN) { buffer.position(beginOffset); - return false; + return RECORD_STATUS.TRUNCATED; } - return true; + logType = buffer.get(); + jobId = buffer.getInt(); + if(logType != LogType.FLUSH) + { + if (logType == LogType.JOB_COMMIT || logType == LogType.ABORT) { + datasetId = -1; + PKHashValue = -1; + } else { + //attempt to read in the dsid, PK hash and PK length + if(buffer.remaining() < ENTITYCOMMIT_UPDATE_HEADER_LEN){ + buffer.position(beginOffset); + return RECORD_STATUS.TRUNCATED; + } + datasetId = buffer.getInt(); + PKHashValue = buffer.getInt(); + PKValueSize = buffer.getInt(); + //attempt to read in the PK + if(buffer.remaining() < PKValueSize){ + buffer.position(beginOffset); + return RECORD_STATUS.TRUNCATED; + } + if (PKValueSize <= 0) { + throw new IllegalStateException("Primary Key Size is less than or equal to 0"); + } + PKValue = readPKValue(buffer); + } + if (logType == LogType.UPDATE) { + //attempt to read in the previous LSN, log size, new value size, and new record type + if(buffer.remaining() <UPDATE_LSN_HEADER + UPDATE_BODY_HEADER){ + buffer.position(beginOffset); + return RECORD_STATUS.TRUNCATED; + } + prevLSN = buffer.getLong(); + resourceId = buffer.getLong(); + logSize = buffer.getInt(); + fieldCnt = buffer.getInt(); + newOp = buffer.get(); + newValueSize = buffer.getInt(); + if(buffer.remaining() < newValueSize){ + buffer.position(beginOffset); + return RECORD_STATUS.TRUNCATED; + } + newValue = readTuple(buffer, readNewValue, fieldCnt, newValueSize); + } else { + computeAndSetLogSize(); + } + } + else{ + computeAndSetLogSize(); + if(buffer.remaining() < DSID_LEN){ + buffer.position(beginOffset); + return RECORD_STATUS.TRUNCATED; + } + datasetId = buffer.getInt(); + resourceId = 0l; + } + //atempt to read checksum + if(buffer.remaining() < CHKSUM_LEN){ + buffer.position(beginOffset); + return RECORD_STATUS.TRUNCATED; + } + checksum = buffer.getLong(); + if (checksum != generateChecksum(buffer, beginOffset, logSize - CHKSUM_LEN)) { + return RECORD_STATUS.BAD_CHKSUM; + } + return RECORD_STATUS.OK; } private ITupleReference readPKValue(ByteBuffer buffer) { diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockManager.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockManager.java index 6f36a6a..ee9fd84 100644 --- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockManager.java +++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockManager.java @@ -37,8 +37,8 @@ import org.apache.asterix.common.transactions.JobId; import org.apache.asterix.common.transactions.LogRecord; import org.apache.asterix.common.transactions.LogType; -import org.apache.asterix.transaction.management.service.logging.LogPage; -import org.apache.asterix.transaction.management.service.logging.LogPageReader; +import org.apache.asterix.transaction.management.service.logging.LogBuffer; +import org.apache.asterix.transaction.management.service.logging.LogBufferTailReader; import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode; import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem; import org.apache.hyracks.api.lifecycle.ILifeCycleComponent; @@ -2204,11 +2204,11 @@ } } - public void batchUnlock(LogPage logPage, LogPageReader logPageReader) throws ACIDException { + public void batchUnlock(LogBuffer logPage, LogBufferTailReader logBufferTailReader) throws ACIDException { latchLockTable(); try { ITransactionContext txnCtx = null; - LogRecord logRecord = logPageReader.next(); + LogRecord logRecord = logBufferTailReader.next(); while (logRecord != null) { if (logRecord.getLogType() == LogType.ENTITY_COMMIT) { tempDatasetIdObj.setId(logRecord.getDatasetId()); @@ -2222,7 +2222,7 @@ txnCtx.notifyOptracker(true); logPage.notifyJobTerminator(); } - logRecord = logPageReader.next(); + logRecord = logBufferTailReader.next(); } } finally { unlatchLockTable(); diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogPage.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java similarity index 93% rename from asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogPage.java rename to asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java index 53e17d4..4d50294 100644 --- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogPage.java +++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java @@ -28,7 +28,7 @@ import org.apache.asterix.common.context.PrimaryIndexOperationTracker; import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.transactions.DatasetId; -import org.apache.asterix.common.transactions.ILogPage; +import org.apache.asterix.common.transactions.ILogBuffer; import org.apache.asterix.common.transactions.ILogRecord; import org.apache.asterix.common.transactions.ITransactionContext; import org.apache.asterix.common.transactions.JobId; @@ -39,12 +39,12 @@ import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem; import org.apache.hyracks.api.exceptions.HyracksDataException; -public class LogPage implements ILogPage { +public class LogBuffer implements ILogBuffer { public static final boolean IS_DEBUG_MODE = false;//true - private static final Logger LOGGER = Logger.getLogger(LogPage.class.getName()); + private static final Logger LOGGER = Logger.getLogger(LogBuffer.class.getName()); private final TransactionSubsystem txnSubsystem; - private final LogPageReader logPageReader; + private final LogBufferTailReader logBufferTailReader; private final int logPageSize; private final MutableLong flushLSN; private final AtomicBoolean full; @@ -62,14 +62,14 @@ private final DatasetId reusableDsId; private final JobId reusableJobId; - public LogPage(TransactionSubsystem txnSubsystem, int logPageSize, MutableLong flushLSN) { + public LogBuffer(TransactionSubsystem txnSubsystem, int logPageSize, MutableLong flushLSN) { this.txnSubsystem = txnSubsystem; this.logPageSize = logPageSize; this.flushLSN = flushLSN; appendBuffer = ByteBuffer.allocate(logPageSize); flushBuffer = appendBuffer.duplicate(); unlockBuffer = appendBuffer.duplicate(); - logPageReader = getLogPageReader(); + logBufferTailReader = getLogBufferTailReader(); full = new AtomicBoolean(false); appendOffset = 0; flushOffset = 0; @@ -206,17 +206,17 @@ } } - private LogPageReader getLogPageReader() { - return new LogPageReader(unlockBuffer); + private LogBufferTailReader getLogBufferTailReader() { + return new LogBufferTailReader(unlockBuffer); } private void batchUnlock(int beginOffset, int endOffset) throws ACIDException { if (endOffset > beginOffset) { - logPageReader.initializeScan(beginOffset, endOffset); + logBufferTailReader.initializeScan(beginOffset, endOffset); ITransactionContext txnCtx = null; - LogRecord logRecord = logPageReader.next(); + LogRecord logRecord = logBufferTailReader.next(); while (logRecord != null) { if (logRecord.getLogType() == LogType.ENTITY_COMMIT) { reusableJobId.setId(logRecord.getJobId()); @@ -234,7 +234,7 @@ notifyFlushTerminator(); } - logRecord = logPageReader.next(); + logRecord = logBufferTailReader.next(); } } } diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogPageReader.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBufferTailReader.java similarity index 78% rename from asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogPageReader.java rename to asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBufferTailReader.java index 76648ae..f8e0253 100644 --- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogPageReader.java +++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBufferTailReader.java @@ -20,15 +20,17 @@ import java.nio.ByteBuffer; +import org.apache.asterix.common.transactions.ILogRecord; +import org.apache.asterix.common.transactions.ILogRecord.RECORD_STATUS; import org.apache.asterix.common.transactions.LogRecord; -public class LogPageReader { +public class LogBufferTailReader { private final ByteBuffer buffer; private final LogRecord logRecord; private int endOffset; - public LogPageReader(ByteBuffer buffer) { + public LogBufferTailReader(ByteBuffer buffer) { this.buffer = buffer; logRecord = new LogRecord(); } @@ -42,7 +44,9 @@ if (buffer.position() == endOffset) { return null; } - if (!logRecord.readLogRecord(buffer)) { + RECORD_STATUS status = logRecord.readLogRecord(buffer); + //underflow is not expected because we are at the very tail of the current log buffer + if (status != RECORD_STATUS.OK) { throw new IllegalStateException(); } return logRecord; diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java index b531961..f14c146 100644 --- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java +++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java @@ -62,11 +62,11 @@ private final String logDir; private final String logFilePrefix; private final MutableLong flushLSN; - private LinkedBlockingQueue<LogPage> emptyQ; - private LinkedBlockingQueue<LogPage> flushQ; + private LinkedBlockingQueue<LogBuffer> emptyQ; + private LinkedBlockingQueue<LogBuffer> flushQ; private final AtomicLong appendLSN; private FileChannel appendChannel; - private LogPage appendPage; + private LogBuffer appendPage; private LogFlusher logFlusher; private Future<Object> futureLogFlusher; private static final long SMALLEST_LOG_FILE_ID = 0; @@ -86,10 +86,10 @@ } private void initializeLogManager(long nextLogFileId) { - emptyQ = new LinkedBlockingQueue<LogPage>(numLogPages); - flushQ = new LinkedBlockingQueue<LogPage>(numLogPages); + emptyQ = new LinkedBlockingQueue<LogBuffer>(numLogPages); + flushQ = new LinkedBlockingQueue<LogBuffer>(numLogPages); for (int i = 0; i < numLogPages; i++) { - emptyQ.offer(new LogPage(txnSubsystem, logPageSize, flushLSN)); + emptyQ.offer(new LogBuffer(txnSubsystem, logPageSize, flushLSN)); } appendLSN.set(initializeLogAnchor(nextLogFileId)); flushLSN.set(appendLSN.get()); @@ -174,7 +174,7 @@ appendPage.isLastPage(true); //[Notice] //the current log file channel is closed if - //LogPage.flush() completely flush the last page of the file. + //LogBuffer.flush() completely flush the last page of the file. } @Override @@ -443,15 +443,15 @@ class LogFlusher implements Callable<Boolean> { private static final Logger LOGGER = Logger.getLogger(LogFlusher.class.getName()); - private final static LogPage POISON_PILL = new LogPage(null, ILogRecord.JOB_TERMINATE_LOG_SIZE, null); + private final static LogBuffer POISON_PILL = new LogBuffer(null, ILogRecord.JOB_TERMINATE_LOG_SIZE, null); private final LogManager logMgr;//for debugging - private final LinkedBlockingQueue<LogPage> emptyQ; - private final LinkedBlockingQueue<LogPage> flushQ; - private LogPage flushPage; + private final LinkedBlockingQueue<LogBuffer> emptyQ; + private final LinkedBlockingQueue<LogBuffer> flushQ; + private LogBuffer flushPage; private final AtomicBoolean isStarted; private final AtomicBoolean terminateFlag; - public LogFlusher(LogManager logMgr, LinkedBlockingQueue<LogPage> emptyQ, LinkedBlockingQueue<LogPage> flushQ) { + public LogFlusher(LogManager logMgr, LinkedBlockingQueue<LogBuffer> emptyQ, LinkedBlockingQueue<LogBuffer> flushQ) { this.logMgr = logMgr; this.emptyQ = emptyQ; this.flushQ = flushQ; diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java index 6fa0ebb..9900468 100644 --- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java +++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java @@ -29,6 +29,12 @@ import org.apache.asterix.common.transactions.LogRecord; import org.apache.asterix.common.transactions.MutableLong; +import static org.apache.asterix.common.transactions.LogRecord.*; + +/** + * NOTE: Many method calls of this class are not thread safe. + * Be very cautious using it in a multithreaded context. + */ public class LogReader implements ILogReader { public static final boolean IS_DEBUG_MODE = false;//true @@ -67,20 +73,56 @@ return; } getFileChannel(); - readPage(); + fillLogReadBuffer(); } - //for scanning + /** + * Get the next log record from the log file. + * @return A deserialized log record, or null if we have reached the end of the file. + * @throws ACIDException + */ @Override public ILogRecord next() throws ACIDException { if (waitForFlushOrReturnIfEOF() == ReturnState.EOF) { return null; } - if (readBuffer.position() == readBuffer.limit() || !logRecord.readLogRecord(readBuffer)) { - readNextPage(); - if (!logRecord.readLogRecord(readBuffer)) { - throw new IllegalStateException(); + if (readBuffer.position() == readBuffer.limit()) { + boolean eof = refillLogReadBuffer(); + if (eof && isRecoveryMode && readLSN < flushLSN.get()) { + LOGGER.severe("Transaction log ends before expected. Log files may be missing."); + return null; } + } + + RECORD_STATUS status = logRecord.readLogRecord(readBuffer); + switch(status) { + case TRUNCATED: { + //we may have just read off the end of the buffer, so try refiling it + if(!refillLogReadBuffer()) { + return null; + } + //now see what we have in the refilled buffer + status = logRecord.readLogRecord(readBuffer); + switch(status){ + case TRUNCATED: { + LOGGER.info("Log file has truncated log records."); + return null; + } + case BAD_CHKSUM:{ + LOGGER.severe("Transaction log contains corrupt log records (perhaps due to medium error). Stopping recovery early."); + return null; + } + case OK: break; + } + //if we have exited the inner switch, + // this means status is really "OK" after buffer refill + break; + } + case BAD_CHKSUM:{ + LOGGER.severe("Transaction log contains corrupt log records (perhaps due to medium error). Stopping recovery early."); + return null; + } + case OK: break; } logRecord.setLSN(readLSN); readLSN += logRecord.getLogSize(); @@ -107,32 +149,55 @@ } } - private void readNextPage() throws ACIDException { + /** + * Continues log analysis between log file splits. + * @return true if log continues, false if EOF + * @throws ACIDException + */ + private boolean refillLogReadBuffer() throws ACIDException { try { if (readLSN % logFileSize == fileChannel.size()) { fileChannel.close(); readLSN += logFileSize - (readLSN % logFileSize); getFileChannel(); } - readPage(); + return fillLogReadBuffer(); } catch (IOException e) { throw new ACIDException(e); } } - private void readPage() throws ACIDException { - int size; + /** + * Fills the log buffer with data from the log file at the current position + * @return false if EOF, true otherwise + * @throws ACIDException + */ + + private boolean fillLogReadBuffer() throws ACIDException { + int size=0; + int read=0; readBuffer.position(0); readBuffer.limit(logPageSize); try { fileChannel.position(readLSN % logFileSize); - size = fileChannel.read(readBuffer); + //We loop here because read() may return 0, but this simply means we are waiting on IO. + //Therefore we want to break out only when either the buffer is full, or we reach EOF. + while( size < logPageSize && read != -1) { + read = fileChannel.read(readBuffer); + if(read>0) { + size += read; + } + } } catch (IOException e) { throw new ACIDException(e); } readBuffer.position(0); readBuffer.limit(size); + if(size == 0 && read == -1){ + return false; //EOF + } bufferBeginLSN = readLSN; + return true; } //for random reading @@ -151,25 +216,37 @@ try { if (fileChannel == null) { getFileChannel(); - readPage(); + fillLogReadBuffer(); } else if (readLSN < fileBeginLSN || readLSN >= fileBeginLSN + fileChannel.size()) { fileChannel.close(); getFileChannel(); - readPage(); + fillLogReadBuffer(); } else if (readLSN < bufferBeginLSN || readLSN >= bufferBeginLSN + readBuffer.limit()) { - readPage(); + fillLogReadBuffer(); } else { readBuffer.position((int) (readLSN - bufferBeginLSN)); } } catch (IOException e) { throw new ACIDException(e); } - if (!logRecord.readLogRecord(readBuffer)) { - readNextPage(); - if (!logRecord.readLogRecord(readBuffer)) { - throw new IllegalStateException(); + boolean hasRemaining; + if(readBuffer.position() == readBuffer.limit()){ + hasRemaining = refillLogReadBuffer(); + if(!hasRemaining){ + throw new ACIDException("LSN is out of bounds"); } } + RECORD_STATUS status = logRecord.readLogRecord(readBuffer); + switch(status){ + case TRUNCATED:{ + throw new ACIDException("LSN is out of bounds"); + } + case BAD_CHKSUM:{ + throw new ACIDException("Log record has incorrect checksum"); + } + case OK: break; + + } logRecord.setLSN(readLSN); readLSN += logRecord.getLogSize(); return logRecord; -- To view, visit https://asterix-gerrit.ics.uci.edu/289 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I1658e938eb0f199f748407361ffee4833aac661c Gerrit-PatchSet: 19 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Ian Maxon <[email protected]> Gerrit-Reviewer: Ian Maxon <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Young-Seok Kim <[email protected]>
