Luo Chen has submitted this change and it was merged.

Change subject: [ASTERIXDB-2188] Ensure recovery of component ids
......................................................................


[ASTERIXDB-2188] Ensure recovery of component ids

- user model changes: no
- storage format changes: yes.
  Flush log record format changes.
- interface changes: no

Details:
- Add flush component ids to the flush log record. Upon
seeing a flush log record during recovery, schedule
a flush to all indexes in this partition s.t. LSN>maxDiskLSN
to ensure component ids are properly maintained upon
failed flushes.
- Add a test case to ensure the correctness of the recovery logic
of component ids

Change-Id: I8c1fc2b209cfb9d3dafa216771d2b7032eb99e75
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2408
Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamou...@gmail.com>
---
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
A 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java
M 
asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/AbstractLSMIOOperationCallbackTest.java
M 
asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsProcessor.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
A 
hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/AllowTestOpCallback.java
M 
hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java
18 files changed, 667 insertions(+), 63 deletions(-)

Approvals:
  Anon. E. Moose #1000171: 
  abdullah alamoudi: Looks good to me, approved
  Jenkins: Verified; ; Verified

Objections:
  Jenkins: Violations found



diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 273d832..a8d8610 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -44,6 +44,8 @@
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.config.ReplicationProperties;
+import org.apache.asterix.common.context.DatasetInfo;
+import org.apache.asterix.common.context.IndexInfo;
 import org.apache.asterix.common.dataflow.DatasetLocalResource;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
@@ -69,9 +71,14 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
 import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId.IdCompareResult;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
 import org.apache.hyracks.storage.common.IIndex;
 import org.apache.hyracks.storage.common.LocalResource;
 import org.apache.logging.log4j.Level;
@@ -284,6 +291,7 @@
         TxnEntityId tempKeyTxnEntityId = new TxnEntityId(-1, -1, -1, null, -1, 
false);
 
         ILogRecord logRecord = null;
+        ILSMComponentIdGenerator idGenerator = null;
         try {
             logReader.setPosition(lowWaterMarkLSN);
             logRecord = logReader.next();
@@ -363,10 +371,51 @@
                             }
                         }
                         break;
+                    case LogType.FLUSH:
+                        int partition = logRecord.getResourcePartition();
+                        if (partitions.contains(partition)) {
+                            int datasetId = logRecord.getDatasetId();
+                            idGenerator = 
datasetLifecycleManager.getComponentIdGenerator(datasetId, partition);
+                            if (idGenerator == null) {
+                                // it's possible this dataset has been dropped
+                                logRecord = logReader.next();
+                                continue;
+                            }
+                            idGenerator.refresh();
+                            DatasetInfo dsInfo = 
datasetLifecycleManager.getDatasetInfo(datasetId);
+                            // we only need to flush open indexes here (opened 
by previous update records)
+                            // if an index has no ongoing updates, then it's 
memory component must be empty
+                            // and there is nothing to flush
+                            for (IndexInfo iInfo : 
dsInfo.getIndexes().values()) {
+                                if (iInfo.isOpen()) {
+                                    maxDiskLastLsn = 
resourceId2MaxLSNMap.get(iInfo.getResourceId());
+                                    index = iInfo.getIndex();
+                                    AbstractLSMIOOperationCallback ioCallback =
+                                            (AbstractLSMIOOperationCallback) 
index.getIOOperationCallback();
+                                    if (logRecord.getLSN() > maxDiskLastLsn
+                                            && 
!index.isCurrentMutableComponentEmpty()) {
+                                        // schedule flush
+                                        
ioCallback.updateLastLSN(logRecord.getLSN());
+                                        redoFlush(index, logRecord);
+                                        redoCount++;
+                                    } else {
+                                        if 
(index.isMemoryComponentsAllocated()) {
+                                            // if the memory component has 
been allocated, we
+                                            // force it to receive the same Id
+                                            
index.getCurrentMemoryComponent().resetId(idGenerator.getId(), true);
+                                        } else {
+                                            // otherwise, we refresh the id 
stored in ioCallback
+                                            // to ensure the memory component 
receives correct Id upon activation
+                                            ioCallback.forceRefreshNextId();
+                                        }
+                                    }
+                                }
+                            }
+                        }
+                        break;
                     case LogType.JOB_COMMIT:
                     case LogType.ENTITY_COMMIT:
                     case LogType.ABORT:
-                    case LogType.FLUSH:
                     case LogType.WAIT:
                     case LogType.MARKER:
                         //do nothing
@@ -736,6 +785,23 @@
         }
     }
 
+    private static void redoFlush(ILSMIndex index, ILogRecord logRecord) 
throws HyracksDataException {
+        ILSMIndexAccessor accessor = 
index.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+        long minId = logRecord.getFlushingComponentMinId();
+        long maxId = logRecord.getFlushingComponentMaxId();
+        ILSMComponentId id = new LSMComponentId(minId, maxId);
+        if (!index.getDiskComponents().isEmpty()) {
+            ILSMDiskComponent diskComponent = index.getDiskComponents().get(0);
+            ILSMComponentId maxDiskComponentId = diskComponent.getId();
+            if (maxDiskComponentId.compareTo(id) != IdCompareResult.LESS_THAN) 
{
+                throw new IllegalStateException("Illegal state of component 
Id. Max disk component Id "
+                        + maxDiskComponentId + " should be less than redo 
flush component Id " + id);
+            }
+        }
+        index.getCurrentMemoryComponent().resetId(id, true);
+        accessor.scheduleFlush(index.getIOOperationCallback());
+    }
+
     private class JobEntityCommits {
         private static final String PARTITION_FILE_NAME_SEPARATOR = "_";
         private final long txnId;
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index 6c4d068..f99429c 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -121,8 +121,8 @@
 
     protected static final String TEST_CONFIG_FILE_NAME = 
"src/main/resources/cc.conf";
     protected static TransactionProperties txnProperties;
-    private static final boolean cleanupOnStart = true;
-    private static final boolean cleanupOnStop = true;
+    private static final boolean CLEANUP_ON_START = true;
+    private static final boolean CLEANUP_ON_STOP = true;
 
     // Constants
     public static final int DEFAULT_HYRACKS_CC_CLIENT_PORT = 1098;
@@ -142,6 +142,10 @@
     }
 
     public void init() throws Exception {
+        init(CLEANUP_ON_START);
+    }
+
+    public void init(boolean cleanupOnStart) throws Exception {
         try {
             File outdir = new File(PATH_ACTUAL);
             outdir.mkdirs();
@@ -157,12 +161,20 @@
     }
 
     public void deInit() throws Exception {
+        deInit(CLEANUP_ON_STOP);
+    }
+
+    public void deInit(boolean cleanupOnStop) throws Exception {
         ExternalUDFLibrarian.removeLibraryDir();
         ExecutionTestUtil.tearDown(cleanupOnStop);
     }
 
     public void setOpts(List<Pair<IOption, Object>> opts) {
         options.addAll(opts);
+    }
+
+    public void clearOpts() {
+        options.clear();
     }
 
     public TxnId getTxnJobId(IHyracksTaskContext ctx) {
@@ -241,10 +253,15 @@
                 SecondaryIndexInfo secondaryIndexInfo = new 
SecondaryIndexInfo(primaryIndexInfo, secondaryIndex);
                 IIndexDataflowHelperFactory secondaryIndexHelperFactory = new 
IndexDataflowHelperFactory(
                         storageComponentProvider.getStorageManager(), 
secondaryIndexInfo.fileSplitProvider);
+
+                IModificationOperationCallbackFactory 
secondaryModCallbackFactory =
+                        
dataset.getModificationCallbackFactory(storageComponentProvider, secondaryIndex,
+                                IndexOperation.INSERT, primaryKeyIndexes);
+
                 LSMInsertDeleteOperatorNodePushable secondaryInsertOp =
                         new LSMInsertDeleteOperatorNodePushable(ctx, 
ctx.getTaskAttemptId().getTaskId().getPartition(),
                                 secondaryIndexInfo.insertFieldsPermutations, 
secondaryIndexInfo.rDesc, op, false,
-                                secondaryIndexHelperFactory, 
NoOpOperationCallbackFactory.INSTANCE, null);
+                                secondaryIndexHelperFactory, 
secondaryModCallbackFactory, null);
                 assignOp.setOutputFrameWriter(0, secondaryInsertOp, 
secondaryIndexInfo.rDesc);
                 CommitRuntime commitOp = new CommitRuntime(ctx, 
getTxnJobId(ctx), dataset.getDatasetId(),
                         secondaryIndexInfo.primaryKeyIndexes, true, 
ctx.getTaskAttemptId().getTaskId().getPartition(),
@@ -465,12 +482,12 @@
     }
 
     public static class SecondaryIndexInfo {
-        private int[] primaryKeyIndexes;
-        private PrimaryIndexInfo primaryIndexInfo;
-        private Index secondaryIndex;
-        private ConstantFileSplitProvider fileSplitProvider;
-        private RecordDescriptor rDesc;
-        private int[] insertFieldsPermutations;
+        private final int[] primaryKeyIndexes;
+        private final PrimaryIndexInfo primaryIndexInfo;
+        private final Index secondaryIndex;
+        private final ConstantFileSplitProvider fileSplitProvider;
+        private final RecordDescriptor rDesc;
+        private final int[] insertFieldsPermutations;
 
         public SecondaryIndexInfo(PrimaryIndexInfo primaryIndexInfo, Index 
secondaryIndex) {
             this.primaryIndexInfo = primaryIndexInfo;
@@ -504,20 +521,20 @@
     }
 
     public static class PrimaryIndexInfo {
-        private Dataset dataset;
-        private IAType[] primaryKeyTypes;
-        private ARecordType recordType;
-        private ARecordType metaType;
-        private ILSMMergePolicyFactory mergePolicyFactory;
-        private Map<String, String> mergePolicyProperties;
-        private int primaryIndexNumOfTupleFields;
-        private ITypeTraits[] primaryIndexTypeTraits;
-        private ISerializerDeserializer<?>[] primaryIndexSerdes;
-        private ConstantFileSplitProvider fileSplitProvider;
-        private RecordDescriptor rDesc;
-        private int[] primaryIndexInsertFieldsPermutations;
-        private int[] primaryKeyIndexes;
-        private Index index;
+        private final Dataset dataset;
+        private final IAType[] primaryKeyTypes;
+        private final ARecordType recordType;
+        private final ARecordType metaType;
+        private final ILSMMergePolicyFactory mergePolicyFactory;
+        private final Map<String, String> mergePolicyProperties;
+        private final int primaryIndexNumOfTupleFields;
+        private final ITypeTraits[] primaryIndexTypeTraits;
+        private final ISerializerDeserializer<?>[] primaryIndexSerdes;
+        private final ConstantFileSplitProvider fileSplitProvider;
+        private final RecordDescriptor rDesc;
+        private final int[] primaryIndexInsertFieldsPermutations;
+        private final int[] primaryKeyIndexes;
+        private final Index index;
 
         public PrimaryIndexInfo(Dataset dataset, IAType[] primaryKeyTypes, 
ARecordType recordType, ARecordType metaType,
                 ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> 
mergePolicyProperties,
@@ -563,6 +580,10 @@
             return index;
         }
 
+        public Dataset getDataset() {
+            return dataset;
+        }
+
         public IRecordDescriptorProvider getInsertRecordDescriptorProvider() {
             IRecordDescriptorProvider rDescProvider = 
Mockito.mock(IRecordDescriptorProvider.class);
             Mockito.when(rDescProvider.getInputRecordDescriptor(Mockito.any(), 
Mockito.anyInt())).thenReturn(rDesc);
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
index c6232f5..9ef531e 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
@@ -47,6 +47,7 @@
 import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
 import 
org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
+import org.apache.hyracks.storage.am.lsm.btree.impl.AllowTestOpCallback;
 import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
@@ -282,7 +283,7 @@
             lsmAccessor.deleteComponents(
                     c -> (c instanceof ILSMMemoryComponent && 
((ILSMMemoryComponent) c).isModified()));
             // now that the rollback has completed, we will unblock the search
-            lsmBtree.addSearchCallback(StorageTestUtils.ALLOW_CALLBACK);
+            lsmBtree.addSearchCallback(AllowTestOpCallback.INSTANCE);
             lsmBtree.allowSearch(1);
             Assert.assertTrue(firstSearcher.result());
             // search now and ensure
@@ -303,7 +304,7 @@
             DiskComponentLsnPredicate pred = new 
DiskComponentLsnPredicate(lsn);
             lsmAccessor.deleteComponents(pred);
             // now that the rollback has completed, we will unblock the search
-            lsmBtree.addSearchCallback(StorageTestUtils.ALLOW_CALLBACK);
+            lsmBtree.addSearchCallback(AllowTestOpCallback.INSTANCE);
             lsmBtree.allowSearch(1);
             Assert.assertTrue(secondSearcher.result());
             StorageTestUtils.searchAndAssertCount(nc, PARTITION,
@@ -477,7 +478,7 @@
             Rollerback rollerback = new Rollerback(lsmBtree, 
memoryComponentsPredicate);
             //unblock the flush
             lsmBtree.allowFlush(1);
-            lsmBtree.addSearchCallback(StorageTestUtils.ALLOW_CALLBACK);
+            lsmBtree.addSearchCallback(AllowTestOpCallback.INSTANCE);
             lsmBtree.allowSearch(1);
             Assert.assertTrue(firstSearcher.result());
             // ensure current mem component is not modified
@@ -535,7 +536,7 @@
             // now that we enetered, we will rollback
             Rollerback rollerback = new Rollerback(lsmBtree, 
memoryComponentsPredicate);
             // The rollback will be waiting for the flush to complete
-            lsmBtree.addSearchCallback(StorageTestUtils.ALLOW_CALLBACK);
+            lsmBtree.addSearchCallback(AllowTestOpCallback.INSTANCE);
             lsmBtree.allowSearch(1);
             Assert.assertTrue(firstSearcher.result());
             //unblock the flush
@@ -606,7 +607,7 @@
             // unblock the merge
             lsmBtree.allowMerge(1);
             // unblock the search
-            lsmBtree.addSearchCallback(StorageTestUtils.ALLOW_CALLBACK);
+            lsmBtree.addSearchCallback(AllowTestOpCallback.INSTANCE);
             lsmBtree.allowSearch(1);
             Assert.assertTrue(firstSearcher.result());
             rollerback.complete();
@@ -673,7 +674,7 @@
             // now that we enetered, we will rollback
             Rollerback rollerBack = new Rollerback(lsmBtree, new 
DiskComponentLsnPredicate(lsn));
             // unblock the search
-            lsmBtree.addSearchCallback(StorageTestUtils.ALLOW_CALLBACK);
+            lsmBtree.addSearchCallback(AllowTestOpCallback.INSTANCE);
             lsmBtree.allowSearch(1);
             Assert.assertTrue(firstSearcher.result());
             // even though rollback has been called, it is still waiting for 
the merge to complete
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java
new file mode 100644
index 0000000..b10e9b1
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.dataflow;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.Semaphore;
+
+import org.apache.asterix.app.bootstrap.TestNodeController;
+import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo;
+import org.apache.asterix.app.bootstrap.TestNodeController.SecondaryIndexInfo;
+import org.apache.asterix.app.data.gen.TupleGenerator;
+import org.apache.asterix.app.nc.NCAppRuntimeContext;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.config.DatasetConfig.IndexType;
+import org.apache.asterix.common.config.StorageProperties.Option;
+import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
+import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.common.transactions.ITransactionManager;
+import org.apache.asterix.common.transactions.TransactionOptions;
+import org.apache.asterix.external.util.DataflowUtils;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.test.common.TestHelper;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.config.IOption;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
+import 
org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.lsm.btree.impl.AllowTestOpCallback;
+import org.apache.hyracks.storage.am.lsm.btree.impl.ITestOpCallback;
+import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class LSMFlushRecoveryTest {
+
+    private static TestNodeController nc;
+    private static Dataset dataset;
+    private static PrimaryIndexInfo primaryIndexInfo;
+    private static SecondaryIndexInfo secondaryIndexInfo;
+    private static TestLsmBtree primaryIndex;
+    private static TestLsmBtree secondaryIndex;
+    private static Index secondaryIndexEntity;
+    private static NCAppRuntimeContext ncAppCtx;
+    private static IDatasetLifecycleManager dsLifecycleMgr;
+
+    private static IHyracksTaskContext ctx;
+    private static IIndexDataflowHelper primaryIndexDataflowHelper;
+    private static IIndexDataflowHelper secondaryIndexDataflowHelper;
+    private static ITransactionContext txnCtx;
+    private static LSMInsertDeleteOperatorNodePushable insertOp;
+    private static final int PARTITION = 0;
+    private static TupleGenerator tupleGenerator;
+
+    private static final String SECONDARY_INDEX_NAME = "TestIdx";
+    private static final IndexType SECONDARY_INDEX_TYPE = IndexType.BTREE;
+    private static final List<List<String>> SECONDARY_INDEX_FIELD_NAMES =
+            
Arrays.asList(Arrays.asList(StorageTestUtils.RECORD_TYPE.getFieldNames()[1]));
+    private static final List<Integer> SECONDARY_INDEX_FIELD_INDICATORS = 
Arrays.asList(Index.RECORD_INDICATOR);
+    private static final List<IAType> SECONDARY_INDEX_FIELD_TYPES = 
Arrays.asList(BuiltinType.AINT64);
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        System.out.println("SetUp: ");
+        TestHelper.deleteExistingInstanceFiles();
+        String configPath = System.getProperty("user.dir") + File.separator + 
"src" + File.separator + "test"
+                + File.separator + "resources" + File.separator + "cc.conf";
+        nc = new TestNodeController(configPath, false);
+    }
+
+    @Before
+    public void initializeTest() throws Exception {
+        // initialize NC before each test
+        initializeNc(true);
+        initializeTestCtx();
+        createIndex();
+        readIndex();
+        insertOp = StorageTestUtils.getInsertPipeline(nc, ctx, 
secondaryIndexEntity);
+        tupleGenerator = StorageTestUtils.getTupleGenerator();
+    }
+
+    @After
+    public void testRecovery() {
+        try {
+            // right now we've inserted 1000 records to the index, and each 
record is at least 12 bytes.
+            // thus, the memory component size is at least 12KB.
+            List<Pair<IOption, Object>> opts = new ArrayList<>();
+            opts.add(Pair.of(Option.STORAGE_MEMORYCOMPONENT_GLOBALBUDGET, 
"128MB"));
+            opts.add(Pair.of(Option.STORAGE_MAX_ACTIVE_WRITABLE_DATASETS, 
"10000"));
+            nc.setOpts(opts);
+            nc.init(false);
+            initializeTestCtx();
+            readIndex();
+            checkComponentIds();
+            insertOp = StorageTestUtils.getInsertPipeline(nc, ctx, 
secondaryIndexEntity);
+            // insert more records
+            insertRecords(StorageTestUtils.TOTAL_NUM_OF_RECORDS, 
StorageTestUtils.RECORDS_PER_COMPONENT);
+            checkComponentIds();
+
+            dropIndex();
+            // cleanup after each test case
+            nc.deInit(true);
+            nc.clearOpts();
+        } catch (Throwable e) {
+            e.printStackTrace();
+            Assert.fail(e.getMessage());
+        }
+    }
+
+    private void initializeNc(boolean cleanUpOnStart) throws Exception {
+        nc.init(cleanUpOnStart);
+        ncAppCtx = nc.getAppRuntimeContext();
+        dsLifecycleMgr = ncAppCtx.getDatasetLifecycleManager();
+    }
+
+    private void createIndex() throws Exception {
+        primaryIndexInfo = StorageTestUtils.createPrimaryIndex(nc, PARTITION);
+        dataset = primaryIndexInfo.getDataset();
+        secondaryIndexEntity = new Index(dataset.getDataverseName(), 
dataset.getDatasetName(), SECONDARY_INDEX_NAME,
+                SECONDARY_INDEX_TYPE, SECONDARY_INDEX_FIELD_NAMES, 
SECONDARY_INDEX_FIELD_INDICATORS,
+                SECONDARY_INDEX_FIELD_TYPES, false, false, false, 0);
+        secondaryIndexInfo = nc.createSecondaryIndex(primaryIndexInfo, 
secondaryIndexEntity,
+                StorageTestUtils.STORAGE_MANAGER, PARTITION);
+    }
+
+    private void initializeTestCtx() throws Exception {
+        JobId jobId = nc.newJobId();
+        ctx = nc.createTestContext(jobId, PARTITION, false);
+        txnCtx = 
nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx),
+                new 
TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
+    }
+
+    private void readIndex() throws HyracksDataException {
+        IndexDataflowHelperFactory primaryHelperFactory =
+                new IndexDataflowHelperFactory(nc.getStorageManager(), 
primaryIndexInfo.getFileSplitProvider());
+        primaryIndexDataflowHelper = 
primaryHelperFactory.create(ctx.getJobletContext().getServiceContext(), 
PARTITION);
+        primaryIndexDataflowHelper.open();
+        primaryIndex = (TestLsmBtree) 
primaryIndexDataflowHelper.getIndexInstance();
+        primaryIndexDataflowHelper.close();
+
+        IndexDataflowHelperFactory secodnaryIHelperFactory =
+                new IndexDataflowHelperFactory(nc.getStorageManager(), 
secondaryIndexInfo.getFileSplitProvider());
+        secondaryIndexDataflowHelper =
+                
secodnaryIHelperFactory.create(ctx.getJobletContext().getServiceContext(), 
PARTITION);
+        secondaryIndexDataflowHelper.open();
+        secondaryIndex = (TestLsmBtree) 
secondaryIndexDataflowHelper.getIndexInstance();
+        secondaryIndexDataflowHelper.close();
+    }
+
+    private void dropIndex() throws HyracksDataException {
+        primaryIndexDataflowHelper.destroy();
+        secondaryIndexDataflowHelper.destroy();
+    }
+
+    @Test
+    public void testBothFlushSucceed() throws Exception {
+        insertRecords(StorageTestUtils.TOTAL_NUM_OF_RECORDS, 
StorageTestUtils.RECORDS_PER_COMPONENT);
+        // shutdown the server
+        nc.deInit(false);
+    }
+
+    @Test
+    public void testSecondaryFlushFails() throws Exception {
+        insertRecords(StorageTestUtils.TOTAL_NUM_OF_RECORDS, 
StorageTestUtils.RECORDS_PER_COMPONENT);
+
+        primaryIndex.clearFlushCallbacks();
+        secondaryIndex.clearFlushCallbacks();
+
+        Semaphore primaryFlushSemaphore = new Semaphore(0);
+        secondaryIndex.addFlushCallback(new ITestOpCallback<Semaphore>() {
+            @Override
+            public void before(Semaphore t) throws HyracksDataException {
+                throw new HyracksDataException("Kill the flush thread");
+            }
+
+            @Override
+            public void after() throws HyracksDataException {
+
+            }
+        });
+
+        primaryIndex.addFlushCallback(AllowTestOpCallback.INSTANCE);
+        primaryIndex.addIoAfterFinalizeCallback(new ITestOpCallback<Void>() {
+            @Override
+            public void before(Void t) throws HyracksDataException {
+
+            }
+
+            @Override
+            public void after() throws HyracksDataException {
+                primaryFlushSemaphore.release();
+            }
+        });
+        StorageTestUtils.flush(dsLifecycleMgr, primaryIndex, true);
+
+        primaryFlushSemaphore.acquire();
+        List<ILSMDiskComponent> primaryComponents = 
primaryIndex.getDiskComponents();
+        List<ILSMDiskComponent> secondaryComponents = 
secondaryIndex.getDiskComponents();
+        Assert.assertEquals(primaryComponents.size(), 
secondaryComponents.size() + 1);
+        // shutdown the NC
+        nc.deInit(false);
+    }
+
+    @Test
+    public void testPrimaryFlushFails() throws Exception {
+        insertRecords(StorageTestUtils.TOTAL_NUM_OF_RECORDS, 
StorageTestUtils.RECORDS_PER_COMPONENT);
+
+        primaryIndex.clearFlushCallbacks();
+        secondaryIndex.clearFlushCallbacks();
+
+        Semaphore secondaryFlushSemaphore = new Semaphore(0);
+
+        primaryIndex.addFlushCallback(new ITestOpCallback<Semaphore>() {
+            @Override
+            public void before(Semaphore t) throws HyracksDataException {
+                throw new HyracksDataException("Kill the flush thread");
+            }
+
+            @Override
+            public void after() throws HyracksDataException {
+
+            }
+        });
+
+        secondaryIndex.addFlushCallback(AllowTestOpCallback.INSTANCE);
+        secondaryIndex.addIoAfterFinalizeCallback(new ITestOpCallback<Void>() {
+            @Override
+            public void before(Void t) throws HyracksDataException {
+
+            }
+
+            @Override
+            public void after() throws HyracksDataException {
+                secondaryFlushSemaphore.release();
+            }
+        });
+        StorageTestUtils.flush(dsLifecycleMgr, primaryIndex, true);
+
+        secondaryFlushSemaphore.acquire();
+        List<ILSMDiskComponent> primaryComponents = 
primaryIndex.getDiskComponents();
+        List<ILSMDiskComponent> secondaryComponents = 
secondaryIndex.getDiskComponents();
+        Assert.assertEquals(secondaryComponents.size(), 
primaryComponents.size() + 1);
+        // shutdown the NC
+        nc.deInit(false);
+    }
+
+    @Test
+    public void testBothFlushFail() throws Exception {
+        insertRecords(StorageTestUtils.TOTAL_NUM_OF_RECORDS, 
StorageTestUtils.RECORDS_PER_COMPONENT);
+
+        primaryIndex.clearFlushCallbacks();
+        secondaryIndex.clearFlushCallbacks();
+
+        Semaphore primaryFlushSemaphore = new Semaphore(0);
+        Semaphore secondaryFlushSemaphore = new Semaphore(0);
+
+        primaryIndex.addFlushCallback(new ITestOpCallback<Semaphore>() {
+            @Override
+            public void before(Semaphore t) throws HyracksDataException {
+                primaryFlushSemaphore.release();
+                throw new HyracksDataException("Kill the flush thread");
+            }
+
+            @Override
+            public void after() throws HyracksDataException {
+
+            }
+        });
+
+        secondaryIndex.addFlushCallback(new ITestOpCallback<Semaphore>() {
+            @Override
+            public void before(Semaphore t) throws HyracksDataException {
+                secondaryFlushSemaphore.release();
+                throw new HyracksDataException("Kill the fluhs thread");
+            }
+
+            @Override
+            public void after() throws HyracksDataException {
+
+            }
+        });
+        StorageTestUtils.flush(dsLifecycleMgr, primaryIndex, true);
+
+        primaryFlushSemaphore.acquire();
+        secondaryFlushSemaphore.acquire();
+        List<ILSMDiskComponent> primaryComponents = 
primaryIndex.getDiskComponents();
+        List<ILSMDiskComponent> secondaryComponents = 
secondaryIndex.getDiskComponents();
+        Assert.assertEquals(secondaryComponents.size(), 
primaryComponents.size());
+        // shutdown the NC
+        nc.deInit(false);
+    }
+
+    private void insertRecords(int totalNumRecords, int recordsPerComponent) 
throws Exception {
+        StorageTestUtils.allowAllOps(primaryIndex);
+        StorageTestUtils.allowAllOps(secondaryIndex);
+        insertOp.open();
+        VSizeFrame frame = new VSizeFrame(ctx);
+        FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
+        for (int i = 0; i < totalNumRecords; i++) {
+            // flush every RECORDS_PER_COMPONENT records
+            if (i % recordsPerComponent == 0 && i + 1 != totalNumRecords) {
+                if (tupleAppender.getTupleCount() > 0) {
+                    tupleAppender.write(insertOp, true);
+                }
+                StorageTestUtils.flush(dsLifecycleMgr, primaryIndex, false);
+            }
+            ITupleReference tuple = tupleGenerator.next();
+            DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
+        }
+        if (tupleAppender.getTupleCount() > 0) {
+            tupleAppender.write(insertOp, true);
+        }
+        insertOp.close();
+        nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
+    }
+
+    private void checkComponentIds() throws HyracksDataException {
+        // check memory component
+        if (primaryIndex.isMemoryComponentsAllocated()) {
+            ILSMMemoryComponent primaryMemComponent = 
primaryIndex.getCurrentMemoryComponent();
+            ILSMMemoryComponent secondaryMemComponent = 
secondaryIndex.getCurrentMemoryComponent();
+            Assert.assertEquals(primaryMemComponent.getId(), 
secondaryMemComponent.getId());
+        }
+
+        List<ILSMDiskComponent> primaryDiskComponents = 
primaryIndex.getDiskComponents();
+        List<ILSMDiskComponent> secondaryDiskComponents = 
secondaryIndex.getDiskComponents();
+
+        Assert.assertEquals(primaryDiskComponents.size(), 
secondaryDiskComponents.size());
+        for (int i = 0; i < primaryDiskComponents.size(); i++) {
+            Assert.assertEquals(primaryDiskComponents.get(i).getId(), 
secondaryDiskComponents.get(i).getId());
+        }
+    }
+
+}
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
index c452548..e4373f6 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
@@ -57,6 +57,7 @@
 import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
 import 
org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
+import org.apache.hyracks.storage.am.lsm.btree.impl.AllowTestOpCallback;
 import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
@@ -147,7 +148,7 @@
     }
 
     void unblockSearch(TestLsmBtree lsmBtree) {
-        lsmBtree.addSearchCallback(StorageTestUtils.ALLOW_CALLBACK);
+        lsmBtree.addSearchCallback(AllowTestOpCallback.INSTANCE);
         lsmBtree.allowSearch(1);
     }
 
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java
index e7a455c..d9f2e20 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java
@@ -57,6 +57,7 @@
 import org.apache.hyracks.api.test.CountAnswer;
 import org.apache.hyracks.api.test.FrameWriterTestUtils;
 import org.apache.hyracks.api.test.FrameWriterTestUtils.FrameWriterOperation;
+import org.apache.hyracks.storage.am.lsm.btree.impl.AllowTestOpCallback;
 import org.apache.hyracks.storage.am.lsm.btree.impl.ITestOpCallback;
 import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
@@ -92,25 +93,20 @@
                     NoMergePolicyFactory.NAME, null, new 
InternalDatasetDetails(null, PartitioningStrategy.HASH,
                             PARTITIONING_KEYS, null, null, null, false, null),
                     null, DatasetType.INTERNAL, DATASET_ID, 0);
-    public static final ITestOpCallback<Semaphore> ALLOW_CALLBACK = new 
ITestOpCallback<Semaphore>() {
-        @Override
-        public void before(Semaphore smeaphore) {
-            smeaphore.release();
-        }
-
-        @Override
-        public void after() {
-        }
-    };
 
     private StorageTestUtils() {
     }
 
     static void allowAllOps(TestLsmBtree lsmBtree) {
-        lsmBtree.addModifyCallback(ALLOW_CALLBACK);
-        lsmBtree.addFlushCallback(ALLOW_CALLBACK);
-        lsmBtree.addSearchCallback(ALLOW_CALLBACK);
-        lsmBtree.addMergeCallback(ALLOW_CALLBACK);
+        lsmBtree.clearModifyCallbacks();
+        lsmBtree.clearFlushCallbacks();
+        lsmBtree.clearSearchCallbacks();
+        lsmBtree.clearMergeCallbacks();
+
+        lsmBtree.addModifyCallback(AllowTestOpCallback.INSTANCE);
+        lsmBtree.addFlushCallback(AllowTestOpCallback.INSTANCE);
+        lsmBtree.addSearchCallback(AllowTestOpCallback.INSTANCE);
+        lsmBtree.addMergeCallback(AllowTestOpCallback.INSTANCE);
     }
 
     public static PrimaryIndexInfo createPrimaryIndex(TestNodeController nc, 
int partition)
@@ -121,8 +117,13 @@
 
     public static LSMInsertDeleteOperatorNodePushable 
getInsertPipeline(TestNodeController nc, IHyracksTaskContext ctx)
             throws HyracksDataException, RemoteException, ACIDException, 
AlgebricksException {
+        return getInsertPipeline(nc, ctx, null);
+    }
+
+    public static LSMInsertDeleteOperatorNodePushable 
getInsertPipeline(TestNodeController nc, IHyracksTaskContext ctx,
+            Index secondaryIndex) throws HyracksDataException, 
RemoteException, ACIDException, AlgebricksException {
         return nc.getInsertPipeline(ctx, DATASET, KEY_TYPES, RECORD_TYPE, 
META_TYPE, null, KEY_INDEXES,
-                KEY_INDICATORS_LIST, STORAGE_MANAGER, null).getLeft();
+                KEY_INDICATORS_LIST, STORAGE_MANAGER, 
secondaryIndex).getLeft();
     }
 
     public static TupleGenerator getTupleGenerator() {
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 1a61b8f..9de8f73 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -49,6 +49,7 @@
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
 import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator;
 import org.apache.hyracks.storage.common.IIndex;
 import org.apache.hyracks.storage.common.ILocalResourceRepository;
@@ -330,6 +331,9 @@
     @Override
     public synchronized ILSMComponentIdGenerator getComponentIdGenerator(int 
datasetId, int partition) {
         DatasetResource dataset = datasets.get(datasetId);
+        if (dataset == null) {
+            return null;
+        }
         ILSMComponentIdGenerator generator = 
dataset.getComponentIdGenerator(partition);
         if (generator == null) {
             populateOpTrackerAndIdGenerator(dataset, partition);
@@ -425,12 +429,26 @@
             }
             int partition = primaryOpTracker.getPartition();
             Collection<ILSMIndex> indexes = 
dsInfo.getDatasetPartitionOpenIndexes(partition);
+            ILSMIndex flushIndex = null;
+            for (ILSMIndex lsmIndex : indexes) {
+                if (!lsmIndex.isCurrentMutableComponentEmpty()) {
+                    flushIndex = lsmIndex;
+                    break;
+                }
+            }
+            if (flushIndex == null) {
+                // all open indexes are empty, nothing to flush
+                continue;
+            }
+            LSMComponentId componentId = (LSMComponentId) 
flushIndex.getCurrentMemoryComponent().getId();
             ILSMComponentIdGenerator idGenerator = 
getComponentIdGenerator(dsInfo.getDatasetID(), partition);
             idGenerator.refresh();
 
             if (dsInfo.isDurable()) {
+
                 synchronized (logRecord) {
-                    TransactionUtil.formFlushLogRecord(logRecord, 
dsInfo.getDatasetID(), null);
+                    TransactionUtil.formFlushLogRecord(logRecord, 
dsInfo.getDatasetID(), partition,
+                            componentId.getMinId(), componentId.getMaxId(), 
null);
                     try {
                         logManager.log(logRecord);
                     } catch (ACIDException e) {
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index f7f2806..8ed4bb6 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -38,6 +38,7 @@
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
 import org.apache.hyracks.storage.common.IModificationOperationCallback;
 import org.apache.hyracks.storage.common.ISearchOperationCallback;
 
@@ -117,6 +118,7 @@
 
         if (needsFlush || flushOnExit) {
             //Make the current mutable components READABLE_UNWRITABLE to stop 
coming modify operations from entering them until the current flush is 
scheduled.
+            LSMComponentId primaryId = null;
             for (ILSMIndex lsmIndex : indexes) {
                 ILSMOperationTracker opTracker = 
lsmIndex.getOperationTracker();
                 synchronized (opTracker) {
@@ -124,7 +126,13 @@
                     if (memComponent.getState() == 
ComponentState.READABLE_WRITABLE && memComponent.isModified()) {
                         
memComponent.setState(ComponentState.READABLE_UNWRITABLE);
                     }
+                    if (lsmIndex.isPrimaryIndex()) {
+                        primaryId = (LSMComponentId) memComponent.getId();
+                    }
                 }
+            }
+            if (primaryId == null) {
+                throw new IllegalStateException("Primary index not found in 
dataset " + dsInfo.getDatasetID());
             }
             LogRecord logRecord = new LogRecord();
             flushOnExit = false;
@@ -133,7 +141,8 @@
                  * Generate a FLUSH log.
                  * Flush will be triggered when the log is written to disk by 
LogFlusher.
                  */
-                TransactionUtil.formFlushLogRecord(logRecord, datasetID, this);
+                TransactionUtil.formFlushLogRecord(logRecord, datasetID, 
partition, primaryId.getMinId(),
+                        primaryId.getMaxId(), this);
                 try {
                     logManager.log(logRecord);
                 } catch (ACIDException e) {
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
index bacebf1..412981c 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
@@ -213,6 +213,13 @@
         }
     }
 
+    /**
+     * Used during the recovery process to force refresh the next component id
+     */
+    public void forceRefreshNextId() {
+        nextComponentIds[writeIndex] = idGenerator.getId();
+    }
+
     public synchronized void setFirstLSN(long firstLSN) {
         // We make sure that this method is only called on an empty component 
so the first LSN is not set incorrectly
         firstLSNs[writeIndex] = firstLSN;
@@ -258,7 +265,7 @@
     @Override
     public void recycled(ILSMMemoryComponent component, boolean 
componentSwitched) throws HyracksDataException {
         ILSMComponentId componentId = getLSMComponentId();
-        component.resetId(componentId);
+        component.resetId(componentId, false);
         if (componentSwitched) {
             recycleIndex = (recycleIndex + 1) % nextComponentIds.length;
         }
@@ -269,7 +276,7 @@
         if (component == lsmIndex.getCurrentMemoryComponent()) {
             // only set the component id for the first (current) memory 
component
             ILSMComponentId componentId = getLSMComponentId();
-            component.resetId(componentId);
+            component.resetId(componentId, false);
         }
     }
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
index 7ddfdfb..e58a6fa 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
@@ -46,6 +46,8 @@
     int SEQ_NUM_LEN = Long.BYTES;
     int TYPE_LEN = Byte.BYTES;
     int UUID_LEN = Long.BYTES;
+    int FLUSHING_COMPONENT_MINID_LEN = Long.BYTES;
+    int FLUSHING_COMPONENT_MAXID_LEN = Long.BYTES;
 
     int ALL_RECORD_HEADER_LEN = LOG_SOURCE_LEN + TYPE_LEN + TxnId.BYTES;
     int ENTITYCOMMIT_UPDATE_HEADER_LEN = RS_PARTITION_LEN + DatasetId.BYTES + 
PKHASH_LEN + PKSZ_LEN;
@@ -55,7 +57,8 @@
     int JOB_TERMINATE_LOG_SIZE = ALL_RECORD_HEADER_LEN + CHKSUM_LEN;
     int ENTITY_COMMIT_LOG_BASE_SIZE = ALL_RECORD_HEADER_LEN + 
ENTITYCOMMIT_UPDATE_HEADER_LEN + CHKSUM_LEN;
     int UPDATE_LOG_BASE_SIZE = ENTITY_COMMIT_LOG_BASE_SIZE + UPDATE_LSN_HEADER 
+ UPDATE_BODY_HEADER;
-    int FLUSH_LOG_SIZE = ALL_RECORD_HEADER_LEN + DatasetId.BYTES + CHKSUM_LEN;
+    int FLUSH_LOG_SIZE = ALL_RECORD_HEADER_LEN + DS_LEN + RS_PARTITION_LEN + 
FLUSHING_COMPONENT_MINID_LEN
+            + FLUSHING_COMPONENT_MAXID_LEN + CHKSUM_LEN;
     int WAIT_LOG_SIZE = ALL_RECORD_HEADER_LEN + CHKSUM_LEN;
     int MARKER_BASE_LOG_SIZE =
             ALL_RECORD_HEADER_LEN + CHKSUM_LEN + DS_LEN + RS_PARTITION_LEN + 
PRVLSN_LEN + LOGRCD_SZ_LEN;
@@ -176,4 +179,12 @@
      * @return the flag
      */
     boolean isReplicate();
+
+    long getFlushingComponentMinId();
+
+    void setFlushingComponentMinId(long flushingComponentMinId);
+
+    long getFlushingComponentMaxId();
+
+    void setFlushingComponentMaxId(long flushingComponentMaxId);
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
index 743a3fe..7e61266 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
@@ -81,6 +81,8 @@
     private long checksum;
     private long prevMarkerLSN;
     private ByteBuffer marker;
+    private long flushingComponentMinId;
+    private long flushingComponentMaxId;
     // ------------- fields in a log record (end) --------------//
     private final ILogMarkerCallback callback; // A callback for log mark 
operations
     private int PKFieldCnt;
@@ -141,6 +143,9 @@
                 break;
             case LogType.FLUSH:
                 buffer.putInt(datasetId);
+                buffer.putInt(resourcePartition);
+                buffer.putLong(flushingComponentMinId);
+                buffer.putLong(flushingComponentMaxId);
                 break;
             case LogType.MARKER:
                 buffer.putInt(datasetId);
@@ -238,13 +243,23 @@
         txnId = buffer.getLong();
         switch (logType) {
             case LogType.FLUSH:
+                if (buffer.remaining() < DS_LEN + RS_PARTITION_LEN + 
FLUSHING_COMPONENT_MINID_LEN
+                        + FLUSHING_COMPONENT_MAXID_LEN) {
+                    return RecordReadStatus.TRUNCATED;
+                }
+                datasetId = buffer.getInt();
+                resourcePartition = buffer.getInt();
+                flushingComponentMinId = buffer.getLong();
+                flushingComponentMaxId = buffer.getLong();
+                resourceId = 0l;
+                computeAndSetLogSize();
+                break;
+            case LogType.WAIT:
                 if (buffer.remaining() < ILogRecord.DS_LEN) {
                     return RecordReadStatus.TRUNCATED;
                 }
                 datasetId = buffer.getInt();
                 resourceId = 0l;
-                // fall throuh
-            case LogType.WAIT:
                 computeAndSetLogSize();
                 break;
             case LogType.JOB_COMMIT:
@@ -710,4 +725,24 @@
     public void setRequester(ILogRequester requester) {
         this.requester = requester;
     }
+
+    @Override
+    public long getFlushingComponentMinId() {
+        return flushingComponentMinId;
+    }
+
+    @Override
+    public void setFlushingComponentMinId(long flushingComponentMinId) {
+        this.flushingComponentMinId = flushingComponentMinId;
+    }
+
+    @Override
+    public long getFlushingComponentMaxId() {
+        return flushingComponentMaxId;
+    }
+
+    @Override
+    public void setFlushingComponentMaxId(long flushingComponentMaxId) {
+        this.flushingComponentMaxId = flushingComponentMaxId;
+    }
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java
index c3af0f3..690eeb6 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java
@@ -45,10 +45,14 @@
         logRecord.computeAndSetLogSize();
     }
 
-    public static void formFlushLogRecord(LogRecord logRecord, int datasetId, 
PrimaryIndexOperationTracker opTracker) {
+    public static void formFlushLogRecord(LogRecord logRecord, int datasetId, 
int resourcePartition,
+            long flushingComponentMinId, long flushingComponentMaxId, 
PrimaryIndexOperationTracker opTracker) {
         logRecord.setLogType(LogType.FLUSH);
         logRecord.setTxnId(-1);
         logRecord.setDatasetId(datasetId);
+        logRecord.setResourcePartition(resourcePartition);
+        logRecord.setFlushingComponentMinId(flushingComponentMinId);
+        logRecord.setFlushingComponentMaxId(flushingComponentMaxId);
         logRecord.setOpTracker(opTracker);
         logRecord.computeAndSetLogSize();
     }
diff --git 
a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/AbstractLSMIOOperationCallbackTest.java
 
b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/AbstractLSMIOOperationCallbackTest.java
index 1c57d1b..2ab5b4e 100644
--- 
a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/AbstractLSMIOOperationCallbackTest.java
+++ 
b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/AbstractLSMIOOperationCallbackTest.java
@@ -263,9 +263,11 @@
 
     private void checkMemoryComponent(ILSMComponentId expected, 
ILSMMemoryComponent memoryComponent)
             throws HyracksDataException {
-        ArgumentCaptor<ILSMComponentId> argument = 
ArgumentCaptor.forClass(ILSMComponentId.class);
-        Mockito.verify(memoryComponent).resetId(argument.capture());
-        assertEquals(expected, argument.getValue());
+        ArgumentCaptor<ILSMComponentId> idArgument = 
ArgumentCaptor.forClass(ILSMComponentId.class);
+        ArgumentCaptor<Boolean> forceArgument = 
ArgumentCaptor.forClass(Boolean.class);
+        Mockito.verify(memoryComponent).resetId(idArgument.capture(), 
forceArgument.capture());
+        assertEquals(expected, idArgument.getValue());
+        assertEquals(false, forceArgument.getValue().booleanValue());
 
         Mockito.reset(memoryComponent);
     }
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsProcessor.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsProcessor.java
index 5009614..6189e37 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsProcessor.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsProcessor.java
@@ -67,7 +67,9 @@
                     break;
                 case LogType.FLUSH:
                     RemoteLogRecord flushLog = new RemoteLogRecord();
-                    TransactionUtil.formFlushLogRecord(flushLog, 
reusableLog.getDatasetId(), null);
+                    TransactionUtil.formFlushLogRecord(flushLog, 
reusableLog.getDatasetId(),
+                            reusableLog.getResourcePartition(), 
reusableLog.getFlushingComponentMinId(),
+                            reusableLog.getFlushingComponentMaxId(), null);
                     flushLog.setRequester(this);
                     flushLog.setLogSource(LogSource.REMOTE);
                     flushLog.setMasterLsn(reusableLog.getLSN());
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java
index c72d402..4ff6377 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java
@@ -107,7 +107,9 @@
      * Reset the component Id of the memory component after it's recycled
      *
      * @param newId
+     * @param force
+     *      Whether to force reset the Id to skip sanity checks
      * @throws HyracksDataException
      */
-    void resetId(ILSMComponentId newId) throws HyracksDataException;
+    void resetId(ILSMComponentId newId, boolean force) throws 
HyracksDataException;
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
index 3fbef18..9596495 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
@@ -296,8 +296,8 @@
     }
 
     @Override
-    public void resetId(ILSMComponentId componentId) throws 
HyracksDataException {
-        if (this.componentId != null && !componentId.missing() // for backward 
compatibility
+    public void resetId(ILSMComponentId componentId, boolean force) throws 
HyracksDataException {
+        if (!force && this.componentId != null && !componentId.missing() // 
for backward compatibility
                 && this.componentId.compareTo(componentId) != 
IdCompareResult.LESS_THAN) {
             throw new IllegalStateException(
                     this + " receives illegal id. Old id " + this.componentId 
+ ", new id " + componentId);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/AllowTestOpCallback.java
 
b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/AllowTestOpCallback.java
new file mode 100644
index 0000000..19a9872
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/AllowTestOpCallback.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.impl;
+
+import java.util.concurrent.Semaphore;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class AllowTestOpCallback implements ITestOpCallback<Semaphore> {
+
+    public static final ITestOpCallback<Semaphore> INSTANCE = new 
AllowTestOpCallback();
+
+    private AllowTestOpCallback() {
+    }
+
+    @Override
+    public void before(Semaphore t) throws HyracksDataException {
+        t.release();
+    }
+
+    @Override
+    public void after() throws HyracksDataException {
+
+    }
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java
 
b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java
index 3c781a6..1d4b7d6 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java
@@ -53,6 +53,7 @@
 public class TestLsmBtree extends LSMBTree {
 
     // Semaphores are used to control operations
+    // Operations are allowed by default.
     private final Semaphore modifySemaphore = new Semaphore(0);
     private final Semaphore searchSemaphore = new Semaphore(0);
     private final Semaphore flushSemaphore = new Semaphore(0);
@@ -91,6 +92,11 @@
                 filterFrameFactory, filterManager, 
bloomFilterFalsePositiveRate, fieldCount, cmpFactories, mergePolicy,
                 opTracker, ioScheduler, ioOperationCallbackFactory, 
needKeyDupCheck, btreeFields, filterFields, durable,
                 updateAware, tracer);
+
+        addModifyCallback(AllowTestOpCallback.INSTANCE);
+        addSearchCallback(AllowTestOpCallback.INSTANCE);
+        addFlushCallback(AllowTestOpCallback.INSTANCE);
+        addMergeCallback(AllowTestOpCallback.INSTANCE);
     }
 
     @Override
@@ -226,13 +232,13 @@
     }
 
     public void addModifyCallback(ITestOpCallback<Semaphore> modifyCallback) {
-        synchronized (mergeCallbacks) {
+        synchronized (modifyCallbacks) {
             modifyCallbacks.add(modifyCallback);
         }
     }
 
     public void clearModifyCallbacks() {
-        synchronized (mergeCallbacks) {
+        synchronized (modifyCallbacks) {
             modifyCallbacks.clear();
         }
     }
@@ -329,6 +335,18 @@
         }
     }
 
+    public void addIoAfterFinalizeCallback(ITestOpCallback<Void> callback) {
+        synchronized (ioAfterFinalizeCallbacks) {
+            ioAfterFinalizeCallbacks.add(callback);
+        }
+    }
+
+    public void clearIoAfterFinalizeCallbacks() {
+        synchronized (ioAfterFinalizeCallbacks) {
+            ioAfterFinalizeCallbacks.clear();
+        }
+    }
+
     @Override
     public void allocateMemoryComponents() throws HyracksDataException {
         synchronized (allocateComponentCallbacks) {

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2408
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I8c1fc2b209cfb9d3dafa216771d2b7032eb99e75
Gerrit-PatchSet: 7
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Luo Chen <cl...@uci.edu>
Gerrit-Reviewer: Anon. E. Moose #1000171
Gerrit-Reviewer: Ian Maxon <ima...@apache.org>
Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Luo Chen <cl...@uci.edu>
Gerrit-Reviewer: Murtadha Hubail <mhub...@apache.org>
Gerrit-Reviewer: abdullah alamoudi <bamou...@gmail.com>

Reply via email to