Michael Blow has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/2368

Change subject: [ASTERIXDB-2281][RT] Consider reserved txn ids when determining 
max
......................................................................

[ASTERIXDB-2281][RT] Consider reserved txn ids when determining max

Change-Id: I88f14fb351976db239ed752693e59882da62d588
---
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITxnIdFactory.java
R 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/CachingTxnIdFactory.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
A 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockRequestMessage.java
A 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockRequestResponseMessage.java
R 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/BulkTxnIdFactory.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
20 files changed, 291 insertions(+), 131 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/68/2368/1

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 1de6938..85d33ee 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -513,4 +513,9 @@
     public ICoordinationService getCoordinationService() {
         return NoOpCoordinationService.INSTANCE;
     }
+
+    @Override
+    public long getMaxTxnId() {
+        return MetadataManager.INSTANCE.getMaxTxnId();
+    }
 }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
index a6f10ca..6096843 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
@@ -51,38 +51,40 @@
     public void handle(INcApplicationContext appCtx) throws 
HyracksDataException, InterruptedException {
         INCMessageBroker broker = (INCMessageBroker) 
appCtx.getServiceContext().getMessageBroker();
         IControllerService cs = 
appCtx.getServiceContext().getControllerService();
-        boolean success = true;
-        try {
-            Throwable exception = null;
+        cs.getExecutor().submit(() -> {
+            boolean success = true;
             try {
-                for (INCLifecycleTask task : tasks) {
-                    if (LOGGER.isInfoEnabled()) {
-                        LOGGER.log(Level.INFO, "Starting startup task: " + 
task);
+                Throwable exception = null;
+                try {
+                    for (INCLifecycleTask task : tasks) {
+                        if (LOGGER.isInfoEnabled()) {
+                            LOGGER.log(Level.INFO, "Starting startup task: " + 
task);
+                        }
+                        task.perform(getCcId(), cs);
+                        if (LOGGER.isInfoEnabled()) {
+                            LOGGER.log(Level.INFO, "Completed startup task: " 
+ task);
+                        }
                     }
-                    task.perform(getCcId(), cs);
-                    if (LOGGER.isInfoEnabled()) {
-                        LOGGER.log(Level.INFO, "Completed startup task: " + 
task);
-                    }
+                } catch (Throwable e) { //NOSONAR all startup failures should 
be reported to CC
+                    LOGGER.log(Level.ERROR, "Failed during startup task", e);
+                    success = false;
+                    exception = e;
                 }
-            } catch (Throwable e) { //NOSONAR all startup failures should be 
reported to CC
-                LOGGER.log(Level.ERROR, "Failed during startup task", e);
-                success = false;
-                exception = e;
+                NCLifecycleTaskReportMessage result = new 
NCLifecycleTaskReportMessage(nodeId, success);
+                result.setException(exception);
+                try {
+                    broker.sendMessageToCC(getCcId(), result);
+                } catch (Exception e) {
+                    success = false;
+                    LOGGER.log(Level.ERROR, "Failed sending message to cc", e);
+                }
+            } finally {
+                if (!success) {
+                    // stop NC so that it can be started again
+                    ExitUtil.exit(NCShutdownHook.FAILED_TO_STARTUP_EXIT_CODE);
+                }
             }
-            NCLifecycleTaskReportMessage result = new 
NCLifecycleTaskReportMessage(nodeId, success);
-            result.setException(exception);
-            try {
-                broker.sendMessageToCC(getCcId(), result);
-            } catch (Exception e) {
-                success = false;
-                LOGGER.log(Level.ERROR, "Failed sending message to cc", e);
-            }
-        } finally {
-            if (!success) {
-                // stop NC so that it can be started again
-                ExitUtil.exit(NCShutdownHook.FAILED_TO_STARTUP_EXIT_CODE);
-            }
-        }
+        });
     }
 
     public String getNodeId() {
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 551e6aa..699892e 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -178,7 +178,7 @@
             throws AlgebricksException, IOException {
         return new CcApplicationContext(ccServiceCtx, getHcc(), 
libraryManager, () -> MetadataManager.INSTANCE,
                 globalRecoveryManager, lifecycleCoordinator, new 
ActiveNotificationHandler(), componentProvider,
-                new MetadataLockManager(), 
MetadataManager::getTxnIdBlockFactory);
+                new MetadataLockManager());
     }
 
     protected GlobalRecoveryManager createGlobalRecoveryManager() throws 
Exception {
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
index a02bda5..ee9b4eb 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
@@ -123,4 +123,6 @@
     IIndexCheckpointManagerProvider getIndexCheckpointManagerProvider();
 
     IReplicaManager getReplicaManager();
+
+    long getMaxTxnId();
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITxnIdFactory.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITxnIdFactory.java
index 3c60432..be4a1f8 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITxnIdFactory.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITxnIdFactory.java
@@ -28,6 +28,8 @@
      */
     TxnId create() throws AlgebricksException;
 
+    long getIdBlock(int blockSize);
+
     /**
      * Ensure that future transaction ids are larger than the supplied id
      *
@@ -35,4 +37,11 @@
      *            the value to ensure future created transaction ids are 
larger than
      */
     void ensureMinimumId(long id) throws AlgebricksException;
+
+    /**
+     * The highest transaction id this factory has created
+     *
+     * @return the max transaction id
+     */
+    long getMaxTxnId();
 }
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcTxnIdFactory.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/CachingTxnIdFactory.java
similarity index 71%
rename from 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcTxnIdFactory.java
rename to 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/CachingTxnIdFactory.java
index 82bbe6b..cab70c4 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcTxnIdFactory.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/CachingTxnIdFactory.java
@@ -16,30 +16,30 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.runtime.utils;
+package org.apache.asterix.metadata;
 
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Supplier;
 
-import org.apache.asterix.common.transactions.ILongBlockFactory;
+import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.transactions.ITxnIdFactory;
 import org.apache.asterix.common.transactions.TxnId;
+import org.apache.asterix.runtime.message.TxnIdBlockRequestMessage;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 /**
  * Represents a factory to generate unique transaction IDs.
  */
-class CcTxnIdFactory implements ITxnIdFactory {
-    private static final int TXN_BLOCK_SIZE = 1024;
+class CachingTxnIdFactory implements ITxnIdFactory {
     private static final Logger LOGGER = LogManager.getLogger();
 
-    private final Supplier<ILongBlockFactory> blockFactorySupplier;
+    private final INcApplicationContext appCtx;
     private volatile Block block = new Block(0, 0);
 
-    public CcTxnIdFactory(Supplier<ILongBlockFactory> blockFactorySupplier) {
-        this.blockFactorySupplier = blockFactorySupplier;
+    public CachingTxnIdFactory(INcApplicationContext appCtx) {
+        this.appCtx = appCtx;
     }
 
     @Override
@@ -50,14 +50,30 @@
             } catch (BlockExhaustedException ex) {
                 // retry
                 LOGGER.info("block exhausted; obtaining new block from 
supplier");
-                block = new 
Block(blockFactorySupplier.get().getBlock(TXN_BLOCK_SIZE), TXN_BLOCK_SIZE);
+                TxnIdBlockRequestMessage.Block newBlock;
+                try {
+                    newBlock = TxnIdBlockRequestMessage.send(appCtx);
+                } catch (HyracksDataException e) {
+                    throw new AlgebricksException(e);
+                }
+                block = new Block(newBlock.getStartingId(), 
newBlock.getBlockSize());
             }
         }
     }
 
     @Override
     public void ensureMinimumId(long id) throws AlgebricksException {
-        blockFactorySupplier.get().ensureMinimum(id);
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getIdBlock(int blockSize) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getMaxTxnId() {
+        return block.endExclusive - 1;
     }
 
     static class Block {
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
index b4b304e..2f9be65 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
@@ -36,6 +36,7 @@
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.common.transactions.ILongBlockFactory;
+import org.apache.asterix.common.transactions.ITxnIdFactory;
 import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.metadata.api.IAsterixStateProxy;
@@ -92,7 +93,7 @@
  * cluster, i.e., metadata transaction ids shall never "accidentally" overlap
  * with transaction ids of regular jobs or other metadata transactions.
  */
-public abstract class MetadataManager implements IMetadataManager, 
ILongBlockFactory {
+public abstract class MetadataManager implements IMetadataManager {
     private final MetadataCache cache = new MetadataCache();
     protected final Collection<IAsterixStateProxy> proxies;
     protected IMetadataNode metadataNode;
@@ -119,13 +120,19 @@
         this.metadataLatch = new ReentrantReadWriteLock(true);
     }
 
+    protected abstract TxnId createTxnId();
+
     @Override
     public void init() throws HyracksDataException {
         // no op
     }
 
     @Override
-    public abstract MetadataTransactionContext beginTransaction() throws 
RemoteException, ACIDException;
+    public MetadataTransactionContext beginTransaction() throws 
RemoteException {
+        TxnId txnId = createTxnId();
+        metadataNode.beginTransaction(txnId);
+        return new MetadataTransactionContext(txnId);
+    }
 
     @Override
     public void commitTransaction(MetadataTransactionContext ctx) throws 
RemoteException, ACIDException {
@@ -997,24 +1004,6 @@
         rebindMetadataNode = true;
     }
 
-    @Override
-    public void ensureMinimum(long value) throws AlgebricksException {
-        try {
-            metadataNode.ensureMinimumTxnId(value);
-        } catch (RemoteException e) {
-            throw new 
MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
-        }
-    }
-
-    @Override
-    public long getBlock(int blockSize) throws AlgebricksException {
-        try {
-            return metadataNode.reserveTxnIdBlock(blockSize);
-        } catch (RemoteException e) {
-            throw new 
MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
-        }
-    }
-
     public static ILongBlockFactory getTxnIdBlockFactory() {
         try {
             INSTANCE.init();
@@ -1046,15 +1035,19 @@
         }
 
         @Override
-        public MetadataTransactionContext beginTransaction() throws 
RemoteException {
+        protected TxnId createTxnId() {
             TxnId txnId;
             try {
                 txnId = appCtx.getTxnIdFactory().create();
             } catch (AlgebricksException e) {
                 throw new ACIDException(e);
             }
-            metadataNode.beginTransaction(txnId);
-            return new MetadataTransactionContext(txnId);
+            return txnId;
+        }
+
+        @Override
+        public long getMaxTxnId() {
+            return appCtx.getTxnIdFactory().getMaxTxnId();
         }
 
         @Override
@@ -1083,15 +1076,25 @@
     }
 
     private static class NCMetadataManagerImpl extends MetadataManager {
+        private final ITxnIdFactory txnIdFactory;
+
         NCMetadataManagerImpl(Collection<IAsterixStateProxy> proxies, 
MetadataNode metadataNode) {
             super(proxies, metadataNode);
+            txnIdFactory = metadataNode.getTxnIdFactory();
         }
 
         @Override
-        public MetadataTransactionContext beginTransaction() throws 
RemoteException {
-            TxnId txnId = new TxnId(metadataNode.reserveTxnIdBlock(1));
-            metadataNode.beginTransaction(txnId);
-            return new MetadataTransactionContext(txnId);
+        protected TxnId createTxnId() {
+            try {
+                return txnIdFactory.create();
+            } catch (AlgebricksException e) {
+                throw new ACIDException(e);
+            }
+        }
+
+        @Override
+        public long getMaxTxnId() {
+            return txnIdFactory.getMaxTxnId();
         }
     }
 }
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index 72d5cf5..69acce10 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -39,6 +39,7 @@
 import org.apache.asterix.common.transactions.ITransactionContext;
 import 
org.apache.asterix.common.transactions.ITransactionManager.AtomicityLevel;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
+import org.apache.asterix.common.transactions.ITxnIdFactory;
 import org.apache.asterix.common.transactions.ImmutableDatasetId;
 import org.apache.asterix.common.transactions.TransactionOptions;
 import org.apache.asterix.common.transactions.TxnId;
@@ -132,7 +133,7 @@
     private IDatasetLifecycleManager datasetLifecycleManager;
     private ITransactionSubsystem transactionSubsystem;
     private int metadataStoragePartition;
-    private transient BulkTxnIdFactory txnIdFactory;
+    private transient CachingTxnIdFactory txnIdFactory;
     // core only
     private transient MetadataTupleTranslatorProvider tupleTranslatorProvider;
     // extension only
@@ -158,17 +159,7 @@
                 }
             }
         }
-        this.txnIdFactory = new BulkTxnIdFactory();
-    }
-
-    @Override
-    public void ensureMinimumTxnId(long maxId) throws ACIDException, 
RemoteException {
-        txnIdFactory.ensureMinimumId(maxId);
-    }
-
-    @Override
-    public long reserveTxnIdBlock(int blockSize) throws ACIDException, 
RemoteException {
-        return txnIdFactory.reserveIdBlock(blockSize);
+        this.txnIdFactory = new CachingTxnIdFactory(runtimeContext);
     }
 
     @Override
@@ -2004,4 +1995,8 @@
             throw new AlgebricksException(e);
         }
     }
+
+    public ITxnIdFactory getTxnIdFactory() {
+        return txnIdFactory;
+    }
 }
\ No newline at end of file
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
index b2d0d3e..e030db3 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
@@ -724,4 +724,6 @@
 
     List<FeedConnection> getFeedConections(MetadataTransactionContext ctx, 
String dataverseName, String feedName)
             throws AlgebricksException;
+
+    long getMaxTxnId();
 }
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
index c3f9d7f..f6abc53 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
@@ -26,7 +26,6 @@
 
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.functions.FunctionSignature;
-import org.apache.asterix.common.transactions.ITxnIdBlockProvider;
 import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.metadata.entities.CompactionPolicy;
@@ -52,28 +51,7 @@
  * lock/access metadata shall always go through the MetadataManager, and should
  * never call methods on the MetadataNode directly for any reason.
  */
-public interface IMetadataNode extends Remote, Serializable, 
ITxnIdBlockProvider {
-
-    /**
-     * Allocates a block of transaction ids of specified block size
-     *
-     * @param maxId
-     *            The txn id to ensure future txn ids are larger than
-     * @throws ACIDException
-     * @throws RemoteException
-     */
-    void ensureMinimumTxnId(long maxId) throws ACIDException, RemoteException;
-
-    /**
-     * Allocates a block of transaction ids of specified block size
-     *
-     * @param blockSize
-     *            The size of the transaction id block to reserve
-     * @return the start of the reserved block
-     * @throws ACIDException
-     * @throws RemoteException
-     */
-    long reserveTxnIdBlock(int blockSize) throws ACIDException, 
RemoteException;
+public interface IMetadataNode extends Remote, Serializable {
 
     /**
      * Begins a local transaction against the metadata.
@@ -828,5 +806,4 @@
 
     List<FeedConnection> getFeedConnections(TxnId txnId, String dataverseName, 
String feedName)
             throws AlgebricksException, RemoteException;
-
 }
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java
index db2a044..50fa083 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java
@@ -65,7 +65,8 @@
         INcApplicationContext appContext = (INcApplicationContext) 
ncs.getApplicationContext();
         long maxResourceId = 
Math.max(appContext.getLocalResourceRepository().maxId(),
                 
MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID);
-        long maxTxnId = 
appContext.getTransactionSubsystem().getTransactionManager().getMaxTxnId();
+        long maxTxnId = Math.max(appContext.getMaxTxnId(),
+                
appContext.getTransactionSubsystem().getTransactionManager().getMaxTxnId());
         long maxJobId = ncs.getMaxJobId(ccId);
         ReportLocalCountersMessage countersMessage =
                 new ReportLocalCountersMessage(ncs.getId(), maxResourceId, 
maxTxnId, maxJobId);
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
index a2f4aa1..3e172c7 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
@@ -39,20 +39,20 @@
     public void handle(ICcApplicationContext appCtx) throws 
HyracksDataException, InterruptedException {
         try {
             ICCMessageBroker broker = (ICCMessageBroker) 
appCtx.getServiceContext().getMessageBroker();
-            ResourceIdRequestResponseMessage reponse = new 
ResourceIdRequestResponseMessage();
+            ResourceIdRequestResponseMessage response = new 
ResourceIdRequestResponseMessage();
             IClusterStateManager clusterStateManager = 
appCtx.getClusterStateManager();
             if (!clusterStateManager.isClusterActive()) {
-                reponse.setResourceId(-1);
-                reponse.setException(new Exception("Cannot generate global 
resource id when cluster is not active."));
+                response.setResourceId(-1);
+                response.setException(new Exception("Cannot generate global 
resource id when cluster is not active."));
             } else {
                 IResourceIdManager resourceIdManager = 
appCtx.getResourceIdManager();
-                reponse.setResourceId(resourceIdManager.createResourceId());
-                if (reponse.getResourceId() < 0) {
-                    reponse.setException(new Exception("One or more nodes has 
not reported max resource id."));
+                response.setResourceId(resourceIdManager.createResourceId());
+                if (response.getResourceId() < 0) {
+                    response.setException(new Exception("One or more nodes has 
not reported max resource id."));
                 }
                 requestMaxResourceID(clusterStateManager, resourceIdManager, 
broker);
             }
-            broker.sendApplicationMessageToNC(reponse, src);
+            broker.sendApplicationMessageToNC(response, src);
         } catch (Exception e) {
             throw HyracksDataException.create(e);
         }
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockRequestMessage.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockRequestMessage.java
new file mode 100644
index 0000000..75b9881
--- /dev/null
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockRequestMessage.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.message;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class TxnIdBlockRequestMessage implements ICcAddressedMessage {
+    private static final Logger LOGGER = LogManager.getLogger();
+    private static final int BLOCK_SIZE = 100;
+    private static final long serialVersionUID = 1L;
+
+    private static BlockingQueue<TxnIdBlockRequestResponseMessage> blockQueue 
= new LinkedBlockingQueue<>();
+    private final String nodeId;
+    private final int blockSizeRequested;
+
+    public TxnIdBlockRequestMessage(String nodeId, int blockSizeRequested) {
+        this.nodeId = nodeId;
+        this.blockSizeRequested = blockSizeRequested;
+    }
+
+    @Override
+    public void handle(ICcApplicationContext appCtx) throws 
HyracksDataException {
+        try {
+            ICCMessageBroker broker = (ICCMessageBroker) 
appCtx.getServiceContext().getMessageBroker();
+            long startingId = 
appCtx.getTxnIdFactory().getIdBlock(blockSizeRequested);
+            TxnIdBlockRequestResponseMessage response =
+                    new TxnIdBlockRequestResponseMessage(startingId, 
blockSizeRequested);
+            broker.sendApplicationMessageToNC(response, nodeId);
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    public static Block send(INcApplicationContext ncs) throws 
HyracksDataException {
+        TxnIdBlockRequestMessage blockRequestMessage =
+                new 
TxnIdBlockRequestMessage(ncs.getServiceContext().getNodeId(), BLOCK_SIZE);
+        try {
+            ((INCMessageBroker) 
ncs.getServiceContext().getMessageBroker()).sendMessageToPrimaryCC(blockRequestMessage);
+            TxnIdBlockRequestResponseMessage response = blockQueue.take();
+            return new Block(response.getStartingId(), 
response.getBlockSize());
+        } catch (Exception e) {
+            LOGGER.log(Level.ERROR, "Unable to request transaction id block", 
e);
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    static void addResponse(TxnIdBlockRequestResponseMessage response) {
+        blockQueue.offer(response);
+    }
+
+    @Override
+    public String toString() {
+        return TxnIdBlockRequestMessage.class.getSimpleName();
+    }
+
+    public static class Block {
+
+        private final long startingId;
+        private final int blockSize;
+
+        public Block(long startingId, int blockSize) {
+            this.startingId = startingId;
+            this.blockSize = blockSize;
+        }
+
+        public long getStartingId() {
+            return startingId;
+        }
+
+        public int getBlockSize() {
+            return blockSize;
+        }
+    }
+}
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockRequestResponseMessage.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockRequestResponseMessage.java
new file mode 100644
index 0000000..ab01a35
--- /dev/null
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockRequestResponseMessage.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.message;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class TxnIdBlockRequestResponseMessage implements INcAddressedMessage {
+    private static final long serialVersionUID = 1L;
+    private final long startingId;
+    private final int blockSize;
+
+    public TxnIdBlockRequestResponseMessage(long startingId, int blockSize) {
+        this.startingId = startingId;
+        this.blockSize = blockSize;
+    }
+
+    public long getStartingId() {
+        return startingId;
+    }
+
+    public int getBlockSize() {
+        return blockSize;
+    }
+
+    @Override
+    public void handle(INcApplicationContext appCtx) throws 
HyracksDataException, InterruptedException {
+        TxnIdBlockRequestMessage.addResponse(this);
+    }
+
+    @Override
+    public String toString() {
+        return TxnIdBlockRequestResponseMessage.class.getSimpleName();
+    }
+}
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/BulkTxnIdFactory.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/BulkTxnIdFactory.java
similarity index 88%
rename from 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/BulkTxnIdFactory.java
rename to 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/BulkTxnIdFactory.java
index 8ac6b63..542bc17 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/BulkTxnIdFactory.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/BulkTxnIdFactory.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.asterix.metadata;
+package org.apache.asterix.runtime.utils;
 
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -33,7 +33,8 @@
         return new TxnId(maxId.incrementAndGet());
     }
 
-    public long reserveIdBlock(int blockSize) {
+    @Override
+    public long getIdBlock(int blockSize) {
         if (blockSize < 1) {
             throw new IllegalArgumentException("block size cannot be smaller 
than 1, but was " + blockSize);
         }
@@ -44,4 +45,9 @@
     public void ensureMinimumId(long id) {
         this.maxId.getAndUpdate(next -> Math.max(next, id));
     }
+
+    @Override
+    public long getMaxTxnId() {
+        return maxId.get();
+    }
 }
\ No newline at end of file
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
index b83df6c..4157e16 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
@@ -24,7 +24,6 @@
 import org.apache.asterix.common.api.ICoordinationService;
 import org.apache.asterix.common.api.IMetadataLockManager;
 import org.apache.asterix.common.api.INodeJobTracker;
-import org.apache.asterix.common.transactions.ILongBlockFactory;
 import org.apache.asterix.common.transactions.ITxnIdFactory;
 import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
@@ -93,8 +92,7 @@
             ILibraryManager libraryManager, Supplier<IMetadataBootstrap> 
metadataBootstrapSupplier,
             IGlobalRecoveryManager globalRecoveryManager, 
INcLifecycleCoordinator ftStrategy,
             IJobLifecycleListener activeLifeCycleListener, 
IStorageComponentProvider storageComponentProvider,
-            IMetadataLockManager mdLockManager, Supplier<ILongBlockFactory> 
txnIdBlockSupplier)
-            throws AlgebricksException, IOException {
+            IMetadataLockManager mdLockManager) throws AlgebricksException, 
IOException {
         this.ccServiceCtx = ccServiceCtx;
         this.hcc = hcc;
         this.libraryManager = libraryManager;
@@ -122,7 +120,8 @@
         clusterStateManager.setCcAppCtx(this);
         this.resourceIdManager = new ResourceIdManager(clusterStateManager);
         nodeJobTracker = new NodeJobTracker();
-        txnIdFactory = new CcTxnIdFactory(txnIdBlockSupplier);
+        txnIdFactory = new BulkTxnIdFactory();
+
     }
 
     @Override
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
index 58b4b27..d0e4655 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
@@ -22,9 +22,6 @@
 import java.io.Serializable;
 
 import org.apache.hyracks.api.util.ErrorMessageUtil;
-import org.apache.logging.log4j.Level;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
 
 /**
  * The main execution time exception type for runtime errors in a hyracks 
environment
@@ -32,7 +29,6 @@
 public class HyracksDataException extends HyracksException {
 
     private static final long serialVersionUID = 1L;
-    private static final Logger LOGGER = LogManager.getLogger();
 
     public static HyracksDataException create(Throwable cause) {
         if (cause instanceof HyracksDataException || cause == null) {
@@ -40,11 +36,8 @@
         } else if (cause instanceof Error) {
             // don't wrap errors, allow them to propagate
             throw (Error) cause;
-        } else if (cause instanceof InterruptedException && 
!Thread.currentThread().isInterrupted()) {
-            // TODO(mblow): why not force interrupt on current thread?
-            LOGGER.log(Level.WARN,
-                    "Wrapping an InterruptedException in HyracksDataException 
and current thread is not interrupted",
-                    cause);
+        } else if (cause instanceof InterruptedException) {
+            Thread.currentThread().interrupt();
         }
         return new HyracksDataException(cause);
     }
@@ -65,10 +58,8 @@
             // don't suppress errors into a HyracksDataException, allow them 
to propagate
             th.addSuppressed(root);
             throw (Error) th;
-        } else if (th instanceof InterruptedException && 
!Thread.currentThread().isInterrupted()) {
-            // TODO(mblow): why not force interrupt on current thread?
-            LOGGER.log(Level.WARN, "Suppressing an InterruptedException in a 
HyracksDataException and current "
-                    + "thread is not interrupted", th);
+        } else if (th instanceof InterruptedException) {
+            Thread.currentThread().interrupt();
         }
         root.addSuppressed(th);
         return root;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
index a95ae3d..5e3c3d4 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
@@ -21,7 +21,6 @@
 import java.util.List;
 
 import org.apache.hyracks.api.comm.NetworkAddress;
-import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.deployment.DeploymentId;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index 5c6d078..3d505f3 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -1314,6 +1314,7 @@
     }
 
     public static class ShutdownResponseFunction extends Function {
+        private static final long serialVersionUID = 1L;
 
         private final String nodeId;
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index ae40ea3..027316e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -21,7 +21,6 @@
 import java.util.List;
 
 import org.apache.hyracks.api.comm.NetworkAddress;
-import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.deployment.DeploymentId;

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2368
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I88f14fb351976db239ed752693e59882da62d588
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Michael Blow <[email protected]>

Reply via email to