This is an automated email from the ASF dual-hosted git repository. mhubail pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push: new d22d545a33 [NO ISSUE][STO] Add API to ensure minimum LSN d22d545a33 is described below commit d22d545a336a07baba7578586f8cd606663d1ce5 Author: Murtadha Hubail <mhub...@apache.org> AuthorDate: Thu Nov 9 02:25:17 2023 +0300 [NO ISSUE][STO] Add API to ensure minimum LSN - user model changes: no - storage format changes: no - interface changes: yes Details: - Add API in the log manager that can be used to advance the append LSN to a specific LSN. - Add API to get the maximum LSN used in specific storage partitions. Change-Id: Ic5f91f08a75f3737e2b33373cfd9500d78ba83d2 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17934 Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Murtadha Hubail <mhub...@apache.org> Reviewed-by: Ali Alsuliman <ali.al.solai...@gmail.com> --- .../org/apache/asterix/app/nc/RecoveryManager.java | 25 ++++++++++++++++++++++ .../asterix/common/transactions/ILogManager.java | 10 +++++++-- .../common/transactions/IRecoveryManager.java | 9 ++++++++ .../management/service/logging/LogManager.java | 14 ++++++++++-- .../service/recovery/CheckpointManager.java | 3 ++- 5 files changed, 56 insertions(+), 5 deletions(-) diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java index f6eb123df3..a7d30a3405 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java @@ -519,6 +519,31 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { return minRemoteLSN; } + @Override + public long getPartitionsMaxLSN(Set<Integer> partitions) throws HyracksDataException { + final IIndexCheckpointManagerProvider idxCheckpointMgrProvider = appCtx.getIndexCheckpointManagerProvider(); + long maxLSN = 0; + for (Integer partition : partitions) { + final List<DatasetResourceReference> partitionResources = localResourceRepository.getResources(resource -> { + DatasetLocalResource dsResource = (DatasetLocalResource) resource.getResource(); + return dsResource.getPartition() == partition; + }, Collections.singleton(partition)).values().stream().map(DatasetResourceReference::of) + .collect(Collectors.toList()); + for (DatasetResourceReference indexRef : partitionResources) { + try { + final IIndexCheckpointManager idxCheckpointMgr = idxCheckpointMgrProvider.get(indexRef); + if (idxCheckpointMgr.isValidIndex()) { + long indexMaxLsn = idxCheckpointMgrProvider.get(indexRef).getLowWatermark(); + maxLSN = Math.max(maxLSN, indexMaxLsn); + } + } catch (Exception e) { + LOGGER.warn("Failed to get max LSN of resource {}", indexRef, e); + } + } + } + return maxLSN; + } + @Override public synchronized void replayReplicaPartitionLogs(Set<Integer> partitions, boolean flush) throws HyracksDataException { diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java index d7e0885464..9d89c07577 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java @@ -99,7 +99,13 @@ public interface ILogManager { public void closeLogFile(TxnLogFile logFileRef, FileChannel fileChannel) throws IOException; /** - * Deletes all current log files and start the next log file partition + * Deletes all current log files and start the next log file partition after {@code minLSN} */ - void renewLogFiles(); + void renewLogFiles(long minLSN); + + /** + * Ensures the next lsn of this log manager is greater than {@code lsn} + * @param lsn + */ + void ensureMinLSN(long lsn); } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java index 8a5f34eb07..aef22154f3 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java @@ -109,6 +109,15 @@ public interface IRecoveryManager { */ void startLocalRecovery(Set<Integer> partitions) throws IOException, ACIDException; + /** + * Gets the LSN used in {@code partitions} or 0 if no LSNs are found + * + * @param partitions + * @return the maximum used LSN + * @throws HyracksDataException + */ + long getPartitionsMaxLSN(Set<Integer> partitions) throws HyracksDataException; + /** * Replay the commited transactions' logs belonging to {@code partitions}. if {@code flush} is true, * all datasets are flushed after the logs are replayed. diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java index e66185c839..a502b302a8 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java @@ -377,16 +377,26 @@ public class LogManager implements ILogManager, ILifeCycleComponent { } @Override - public void renewLogFiles() { + public void renewLogFiles(long minLSN) { terminateLogFlusher(); closeCurrentLogFile(); - long nextLogFileId = getNextLogFileId(); + long nextLogFileId = getLogFileId(minLSN) + 1; + LOGGER.info("renewing txn log files; next file id {}", nextLogFileId); createFileIfNotExists(getLogFilePath(nextLogFileId)); final long logFileFirstLsn = getLogFileFirstLsn(nextLogFileId); deleteOldLogFiles(logFileFirstLsn); initializeLogManager(nextLogFileId); } + @Override + public void ensureMinLSN(long lsn) { + if (appendLSN.get() > lsn) { + LOGGER.info("current append lsn {} > target min LSN {}; not renewing log files", appendLSN.get(), lsn); + return; + } + renewLogFiles(lsn); + } + @Override public void deleteOldLogFiles(long checkpointLSN) { Long checkpointLSNLogFileID = getLogFileId(checkpointLSN); diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java index 460f393b17..6761ea91ae 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java @@ -65,7 +65,8 @@ public class CheckpointManager extends AbstractCheckpointManager { txnSubsystem.getApplicationContext().getDatasetLifecycleManager(); datasetLifecycleManager.flushAllDatasets(); capture(SHARP_CHECKPOINT_LSN, true); - txnSubsystem.getLogManager().renewLogFiles(); + long currentAppendLNS = txnSubsystem.getLogManager().getAppendLSN(); + txnSubsystem.getLogManager().renewLogFiles(currentAppendLNS); LOGGER.info("Completed sharp checkpoint."); }