This is an automated email from the ASF dual-hosted git repository.

mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new d22d545a33 [NO ISSUE][STO] Add API to ensure minimum LSN
d22d545a33 is described below

commit d22d545a336a07baba7578586f8cd606663d1ce5
Author: Murtadha Hubail <mhub...@apache.org>
AuthorDate: Thu Nov 9 02:25:17 2023 +0300

    [NO ISSUE][STO] Add API to ensure minimum LSN
    
    - user model changes: no
    - storage format changes: no
    - interface changes: yes
    
    Details:
    
    - Add API in the log manager that can be used to advance the
      append LSN to a specific LSN.
    - Add API to get the maximum LSN used in specific storage partitions.
    
    Change-Id: Ic5f91f08a75f3737e2b33373cfd9500d78ba83d2
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17934
    Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Murtadha Hubail <mhub...@apache.org>
    Reviewed-by: Ali Alsuliman <ali.al.solai...@gmail.com>
---
 .../org/apache/asterix/app/nc/RecoveryManager.java | 25 ++++++++++++++++++++++
 .../asterix/common/transactions/ILogManager.java   | 10 +++++++--
 .../common/transactions/IRecoveryManager.java      |  9 ++++++++
 .../management/service/logging/LogManager.java     | 14 ++++++++++--
 .../service/recovery/CheckpointManager.java        |  3 ++-
 5 files changed, 56 insertions(+), 5 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index f6eb123df3..a7d30a3405 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -519,6 +519,31 @@ public class RecoveryManager implements IRecoveryManager, 
ILifeCycleComponent {
         return minRemoteLSN;
     }
 
+    @Override
+    public long getPartitionsMaxLSN(Set<Integer> partitions) throws 
HyracksDataException {
+        final IIndexCheckpointManagerProvider idxCheckpointMgrProvider = 
appCtx.getIndexCheckpointManagerProvider();
+        long maxLSN = 0;
+        for (Integer partition : partitions) {
+            final List<DatasetResourceReference> partitionResources = 
localResourceRepository.getResources(resource -> {
+                DatasetLocalResource dsResource = (DatasetLocalResource) 
resource.getResource();
+                return dsResource.getPartition() == partition;
+            }, 
Collections.singleton(partition)).values().stream().map(DatasetResourceReference::of)
+                    .collect(Collectors.toList());
+            for (DatasetResourceReference indexRef : partitionResources) {
+                try {
+                    final IIndexCheckpointManager idxCheckpointMgr = 
idxCheckpointMgrProvider.get(indexRef);
+                    if (idxCheckpointMgr.isValidIndex()) {
+                        long indexMaxLsn = 
idxCheckpointMgrProvider.get(indexRef).getLowWatermark();
+                        maxLSN = Math.max(maxLSN, indexMaxLsn);
+                    }
+                } catch (Exception e) {
+                    LOGGER.warn("Failed to get max LSN of resource {}", 
indexRef, e);
+                }
+            }
+        }
+        return maxLSN;
+    }
+
     @Override
     public synchronized void replayReplicaPartitionLogs(Set<Integer> 
partitions, boolean flush)
             throws HyracksDataException {
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java
index d7e0885464..9d89c07577 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java
@@ -99,7 +99,13 @@ public interface ILogManager {
     public void closeLogFile(TxnLogFile logFileRef, FileChannel fileChannel) 
throws IOException;
 
     /**
-     * Deletes all current log files and start the next log file partition
+     * Deletes all current log files and start the next log file partition 
after {@code minLSN}
      */
-    void renewLogFiles();
+    void renewLogFiles(long minLSN);
+
+    /**
+     * Ensures the next lsn of this log manager is greater than {@code lsn}
+     * @param lsn
+     */
+    void ensureMinLSN(long lsn);
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
index 8a5f34eb07..aef22154f3 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
@@ -109,6 +109,15 @@ public interface IRecoveryManager {
      */
     void startLocalRecovery(Set<Integer> partitions) throws IOException, 
ACIDException;
 
+    /**
+     * Gets the LSN used in {@code partitions} or 0 if no LSNs are found
+     *
+     * @param partitions
+     * @return the maximum used LSN
+     * @throws HyracksDataException
+     */
+    long getPartitionsMaxLSN(Set<Integer> partitions) throws 
HyracksDataException;
+
     /**
      * Replay the commited transactions' logs belonging to {@code partitions}. 
if {@code flush} is true,
      * all datasets are flushed after the logs are replayed.
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index e66185c839..a502b302a8 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -377,16 +377,26 @@ public class LogManager implements ILogManager, 
ILifeCycleComponent {
     }
 
     @Override
-    public void renewLogFiles() {
+    public void renewLogFiles(long minLSN) {
         terminateLogFlusher();
         closeCurrentLogFile();
-        long nextLogFileId = getNextLogFileId();
+        long nextLogFileId = getLogFileId(minLSN) + 1;
+        LOGGER.info("renewing txn log files; next file id {}", nextLogFileId);
         createFileIfNotExists(getLogFilePath(nextLogFileId));
         final long logFileFirstLsn = getLogFileFirstLsn(nextLogFileId);
         deleteOldLogFiles(logFileFirstLsn);
         initializeLogManager(nextLogFileId);
     }
 
+    @Override
+    public void ensureMinLSN(long lsn) {
+        if (appendLSN.get() > lsn) {
+            LOGGER.info("current append lsn {} > target min LSN {}; not 
renewing log files", appendLSN.get(), lsn);
+            return;
+        }
+        renewLogFiles(lsn);
+    }
+
     @Override
     public void deleteOldLogFiles(long checkpointLSN) {
         Long checkpointLSNLogFileID = getLogFileId(checkpointLSN);
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
index 460f393b17..6761ea91ae 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
@@ -65,7 +65,8 @@ public class CheckpointManager extends 
AbstractCheckpointManager {
                 
txnSubsystem.getApplicationContext().getDatasetLifecycleManager();
         datasetLifecycleManager.flushAllDatasets();
         capture(SHARP_CHECKPOINT_LSN, true);
-        txnSubsystem.getLogManager().renewLogFiles();
+        long currentAppendLNS = txnSubsystem.getLogManager().getAppendLSN();
+        txnSubsystem.getLogManager().renewLogFiles(currentAppendLNS);
         LOGGER.info("Completed sharp checkpoint.");
     }
 

Reply via email to