Michael Blow has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/2368
Change subject: [ASTERIXDB-2281][RT] Consider reserved txn ids when determining
max
......................................................................
[ASTERIXDB-2281][RT] Consider reserved txn ids when determining max
Change-Id: I88f14fb351976db239ed752693e59882da62d588
---
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITxnIdFactory.java
R
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/CachingTxnIdFactory.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
A
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockRequestMessage.java
A
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockRequestResponseMessage.java
R
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/BulkTxnIdFactory.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
20 files changed, 291 insertions(+), 131 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/68/2368/1
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 1de6938..85d33ee 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -513,4 +513,9 @@
public ICoordinationService getCoordinationService() {
return NoOpCoordinationService.INSTANCE;
}
+
+ @Override
+ public long getMaxTxnId() {
+ return MetadataManager.INSTANCE.getMaxTxnId();
+ }
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
index a6f10ca..6096843 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
@@ -51,38 +51,40 @@
public void handle(INcApplicationContext appCtx) throws
HyracksDataException, InterruptedException {
INCMessageBroker broker = (INCMessageBroker)
appCtx.getServiceContext().getMessageBroker();
IControllerService cs =
appCtx.getServiceContext().getControllerService();
- boolean success = true;
- try {
- Throwable exception = null;
+ cs.getExecutor().submit(() -> {
+ boolean success = true;
try {
- for (INCLifecycleTask task : tasks) {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.log(Level.INFO, "Starting startup task: " +
task);
+ Throwable exception = null;
+ try {
+ for (INCLifecycleTask task : tasks) {
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.log(Level.INFO, "Starting startup task: " +
task);
+ }
+ task.perform(getCcId(), cs);
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.log(Level.INFO, "Completed startup task: "
+ task);
+ }
}
- task.perform(getCcId(), cs);
- if (LOGGER.isInfoEnabled()) {
- LOGGER.log(Level.INFO, "Completed startup task: " +
task);
- }
+ } catch (Throwable e) { //NOSONAR all startup failures should
be reported to CC
+ LOGGER.log(Level.ERROR, "Failed during startup task", e);
+ success = false;
+ exception = e;
}
- } catch (Throwable e) { //NOSONAR all startup failures should be
reported to CC
- LOGGER.log(Level.ERROR, "Failed during startup task", e);
- success = false;
- exception = e;
+ NCLifecycleTaskReportMessage result = new
NCLifecycleTaskReportMessage(nodeId, success);
+ result.setException(exception);
+ try {
+ broker.sendMessageToCC(getCcId(), result);
+ } catch (Exception e) {
+ success = false;
+ LOGGER.log(Level.ERROR, "Failed sending message to cc", e);
+ }
+ } finally {
+ if (!success) {
+ // stop NC so that it can be started again
+ ExitUtil.exit(NCShutdownHook.FAILED_TO_STARTUP_EXIT_CODE);
+ }
}
- NCLifecycleTaskReportMessage result = new
NCLifecycleTaskReportMessage(nodeId, success);
- result.setException(exception);
- try {
- broker.sendMessageToCC(getCcId(), result);
- } catch (Exception e) {
- success = false;
- LOGGER.log(Level.ERROR, "Failed sending message to cc", e);
- }
- } finally {
- if (!success) {
- // stop NC so that it can be started again
- ExitUtil.exit(NCShutdownHook.FAILED_TO_STARTUP_EXIT_CODE);
- }
- }
+ });
}
public String getNodeId() {
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 551e6aa..699892e 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -178,7 +178,7 @@
throws AlgebricksException, IOException {
return new CcApplicationContext(ccServiceCtx, getHcc(),
libraryManager, () -> MetadataManager.INSTANCE,
globalRecoveryManager, lifecycleCoordinator, new
ActiveNotificationHandler(), componentProvider,
- new MetadataLockManager(),
MetadataManager::getTxnIdBlockFactory);
+ new MetadataLockManager());
}
protected GlobalRecoveryManager createGlobalRecoveryManager() throws
Exception {
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
index a02bda5..ee9b4eb 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
@@ -123,4 +123,6 @@
IIndexCheckpointManagerProvider getIndexCheckpointManagerProvider();
IReplicaManager getReplicaManager();
+
+ long getMaxTxnId();
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITxnIdFactory.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITxnIdFactory.java
index 3c60432..be4a1f8 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITxnIdFactory.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITxnIdFactory.java
@@ -28,6 +28,8 @@
*/
TxnId create() throws AlgebricksException;
+ long getIdBlock(int blockSize);
+
/**
* Ensure that future transaction ids are larger than the supplied id
*
@@ -35,4 +37,11 @@
* the value to ensure future created transaction ids are
larger than
*/
void ensureMinimumId(long id) throws AlgebricksException;
+
+ /**
+ * The highest transaction id this factory has created
+ *
+ * @return the max transaction id
+ */
+ long getMaxTxnId();
}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcTxnIdFactory.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/CachingTxnIdFactory.java
similarity index 71%
rename from
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcTxnIdFactory.java
rename to
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/CachingTxnIdFactory.java
index 82bbe6b..cab70c4 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcTxnIdFactory.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/CachingTxnIdFactory.java
@@ -16,30 +16,30 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.runtime.utils;
+package org.apache.asterix.metadata;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Supplier;
-import org.apache.asterix.common.transactions.ILongBlockFactory;
+import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.transactions.ITxnIdFactory;
import org.apache.asterix.common.transactions.TxnId;
+import org.apache.asterix.runtime.message.TxnIdBlockRequestMessage;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* Represents a factory to generate unique transaction IDs.
*/
-class CcTxnIdFactory implements ITxnIdFactory {
- private static final int TXN_BLOCK_SIZE = 1024;
+class CachingTxnIdFactory implements ITxnIdFactory {
private static final Logger LOGGER = LogManager.getLogger();
- private final Supplier<ILongBlockFactory> blockFactorySupplier;
+ private final INcApplicationContext appCtx;
private volatile Block block = new Block(0, 0);
- public CcTxnIdFactory(Supplier<ILongBlockFactory> blockFactorySupplier) {
- this.blockFactorySupplier = blockFactorySupplier;
+ public CachingTxnIdFactory(INcApplicationContext appCtx) {
+ this.appCtx = appCtx;
}
@Override
@@ -50,14 +50,30 @@
} catch (BlockExhaustedException ex) {
// retry
LOGGER.info("block exhausted; obtaining new block from
supplier");
- block = new
Block(blockFactorySupplier.get().getBlock(TXN_BLOCK_SIZE), TXN_BLOCK_SIZE);
+ TxnIdBlockRequestMessage.Block newBlock;
+ try {
+ newBlock = TxnIdBlockRequestMessage.send(appCtx);
+ } catch (HyracksDataException e) {
+ throw new AlgebricksException(e);
+ }
+ block = new Block(newBlock.getStartingId(),
newBlock.getBlockSize());
}
}
}
@Override
public void ensureMinimumId(long id) throws AlgebricksException {
- blockFactorySupplier.get().ensureMinimum(id);
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getIdBlock(int blockSize) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getMaxTxnId() {
+ return block.endExclusive - 1;
}
static class Block {
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
index b4b304e..2f9be65 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
@@ -36,6 +36,7 @@
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.common.transactions.ILongBlockFactory;
+import org.apache.asterix.common.transactions.ITxnIdFactory;
import org.apache.asterix.common.transactions.TxnId;
import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.metadata.api.IAsterixStateProxy;
@@ -92,7 +93,7 @@
* cluster, i.e., metadata transaction ids shall never "accidentally" overlap
* with transaction ids of regular jobs or other metadata transactions.
*/
-public abstract class MetadataManager implements IMetadataManager,
ILongBlockFactory {
+public abstract class MetadataManager implements IMetadataManager {
private final MetadataCache cache = new MetadataCache();
protected final Collection<IAsterixStateProxy> proxies;
protected IMetadataNode metadataNode;
@@ -119,13 +120,19 @@
this.metadataLatch = new ReentrantReadWriteLock(true);
}
+ protected abstract TxnId createTxnId();
+
@Override
public void init() throws HyracksDataException {
// no op
}
@Override
- public abstract MetadataTransactionContext beginTransaction() throws
RemoteException, ACIDException;
+ public MetadataTransactionContext beginTransaction() throws
RemoteException {
+ TxnId txnId = createTxnId();
+ metadataNode.beginTransaction(txnId);
+ return new MetadataTransactionContext(txnId);
+ }
@Override
public void commitTransaction(MetadataTransactionContext ctx) throws
RemoteException, ACIDException {
@@ -997,24 +1004,6 @@
rebindMetadataNode = true;
}
- @Override
- public void ensureMinimum(long value) throws AlgebricksException {
- try {
- metadataNode.ensureMinimumTxnId(value);
- } catch (RemoteException e) {
- throw new
MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
- }
- }
-
- @Override
- public long getBlock(int blockSize) throws AlgebricksException {
- try {
- return metadataNode.reserveTxnIdBlock(blockSize);
- } catch (RemoteException e) {
- throw new
MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
- }
- }
-
public static ILongBlockFactory getTxnIdBlockFactory() {
try {
INSTANCE.init();
@@ -1046,15 +1035,19 @@
}
@Override
- public MetadataTransactionContext beginTransaction() throws
RemoteException {
+ protected TxnId createTxnId() {
TxnId txnId;
try {
txnId = appCtx.getTxnIdFactory().create();
} catch (AlgebricksException e) {
throw new ACIDException(e);
}
- metadataNode.beginTransaction(txnId);
- return new MetadataTransactionContext(txnId);
+ return txnId;
+ }
+
+ @Override
+ public long getMaxTxnId() {
+ return appCtx.getTxnIdFactory().getMaxTxnId();
}
@Override
@@ -1083,15 +1076,25 @@
}
private static class NCMetadataManagerImpl extends MetadataManager {
+ private final ITxnIdFactory txnIdFactory;
+
NCMetadataManagerImpl(Collection<IAsterixStateProxy> proxies,
MetadataNode metadataNode) {
super(proxies, metadataNode);
+ txnIdFactory = metadataNode.getTxnIdFactory();
}
@Override
- public MetadataTransactionContext beginTransaction() throws
RemoteException {
- TxnId txnId = new TxnId(metadataNode.reserveTxnIdBlock(1));
- metadataNode.beginTransaction(txnId);
- return new MetadataTransactionContext(txnId);
+ protected TxnId createTxnId() {
+ try {
+ return txnIdFactory.create();
+ } catch (AlgebricksException e) {
+ throw new ACIDException(e);
+ }
+ }
+
+ @Override
+ public long getMaxTxnId() {
+ return txnIdFactory.getMaxTxnId();
}
}
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index 72d5cf5..69acce10 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -39,6 +39,7 @@
import org.apache.asterix.common.transactions.ITransactionContext;
import
org.apache.asterix.common.transactions.ITransactionManager.AtomicityLevel;
import org.apache.asterix.common.transactions.ITransactionSubsystem;
+import org.apache.asterix.common.transactions.ITxnIdFactory;
import org.apache.asterix.common.transactions.ImmutableDatasetId;
import org.apache.asterix.common.transactions.TransactionOptions;
import org.apache.asterix.common.transactions.TxnId;
@@ -132,7 +133,7 @@
private IDatasetLifecycleManager datasetLifecycleManager;
private ITransactionSubsystem transactionSubsystem;
private int metadataStoragePartition;
- private transient BulkTxnIdFactory txnIdFactory;
+ private transient CachingTxnIdFactory txnIdFactory;
// core only
private transient MetadataTupleTranslatorProvider tupleTranslatorProvider;
// extension only
@@ -158,17 +159,7 @@
}
}
}
- this.txnIdFactory = new BulkTxnIdFactory();
- }
-
- @Override
- public void ensureMinimumTxnId(long maxId) throws ACIDException,
RemoteException {
- txnIdFactory.ensureMinimumId(maxId);
- }
-
- @Override
- public long reserveTxnIdBlock(int blockSize) throws ACIDException,
RemoteException {
- return txnIdFactory.reserveIdBlock(blockSize);
+ this.txnIdFactory = new CachingTxnIdFactory(runtimeContext);
}
@Override
@@ -2004,4 +1995,8 @@
throw new AlgebricksException(e);
}
}
+
+ public ITxnIdFactory getTxnIdFactory() {
+ return txnIdFactory;
+ }
}
\ No newline at end of file
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
index b2d0d3e..e030db3 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
@@ -724,4 +724,6 @@
List<FeedConnection> getFeedConections(MetadataTransactionContext ctx,
String dataverseName, String feedName)
throws AlgebricksException;
+
+ long getMaxTxnId();
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
index c3f9d7f..f6abc53 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
@@ -26,7 +26,6 @@
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.functions.FunctionSignature;
-import org.apache.asterix.common.transactions.ITxnIdBlockProvider;
import org.apache.asterix.common.transactions.TxnId;
import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.metadata.entities.CompactionPolicy;
@@ -52,28 +51,7 @@
* lock/access metadata shall always go through the MetadataManager, and should
* never call methods on the MetadataNode directly for any reason.
*/
-public interface IMetadataNode extends Remote, Serializable,
ITxnIdBlockProvider {
-
- /**
- * Allocates a block of transaction ids of specified block size
- *
- * @param maxId
- * The txn id to ensure future txn ids are larger than
- * @throws ACIDException
- * @throws RemoteException
- */
- void ensureMinimumTxnId(long maxId) throws ACIDException, RemoteException;
-
- /**
- * Allocates a block of transaction ids of specified block size
- *
- * @param blockSize
- * The size of the transaction id block to reserve
- * @return the start of the reserved block
- * @throws ACIDException
- * @throws RemoteException
- */
- long reserveTxnIdBlock(int blockSize) throws ACIDException,
RemoteException;
+public interface IMetadataNode extends Remote, Serializable {
/**
* Begins a local transaction against the metadata.
@@ -828,5 +806,4 @@
List<FeedConnection> getFeedConnections(TxnId txnId, String dataverseName,
String feedName)
throws AlgebricksException, RemoteException;
-
}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java
index db2a044..50fa083 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java
@@ -65,7 +65,8 @@
INcApplicationContext appContext = (INcApplicationContext)
ncs.getApplicationContext();
long maxResourceId =
Math.max(appContext.getLocalResourceRepository().maxId(),
MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID);
- long maxTxnId =
appContext.getTransactionSubsystem().getTransactionManager().getMaxTxnId();
+ long maxTxnId = Math.max(appContext.getMaxTxnId(),
+
appContext.getTransactionSubsystem().getTransactionManager().getMaxTxnId());
long maxJobId = ncs.getMaxJobId(ccId);
ReportLocalCountersMessage countersMessage =
new ReportLocalCountersMessage(ncs.getId(), maxResourceId,
maxTxnId, maxJobId);
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
index a2f4aa1..3e172c7 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
@@ -39,20 +39,20 @@
public void handle(ICcApplicationContext appCtx) throws
HyracksDataException, InterruptedException {
try {
ICCMessageBroker broker = (ICCMessageBroker)
appCtx.getServiceContext().getMessageBroker();
- ResourceIdRequestResponseMessage reponse = new
ResourceIdRequestResponseMessage();
+ ResourceIdRequestResponseMessage response = new
ResourceIdRequestResponseMessage();
IClusterStateManager clusterStateManager =
appCtx.getClusterStateManager();
if (!clusterStateManager.isClusterActive()) {
- reponse.setResourceId(-1);
- reponse.setException(new Exception("Cannot generate global
resource id when cluster is not active."));
+ response.setResourceId(-1);
+ response.setException(new Exception("Cannot generate global
resource id when cluster is not active."));
} else {
IResourceIdManager resourceIdManager =
appCtx.getResourceIdManager();
- reponse.setResourceId(resourceIdManager.createResourceId());
- if (reponse.getResourceId() < 0) {
- reponse.setException(new Exception("One or more nodes has
not reported max resource id."));
+ response.setResourceId(resourceIdManager.createResourceId());
+ if (response.getResourceId() < 0) {
+ response.setException(new Exception("One or more nodes has
not reported max resource id."));
}
requestMaxResourceID(clusterStateManager, resourceIdManager,
broker);
}
- broker.sendApplicationMessageToNC(reponse, src);
+ broker.sendApplicationMessageToNC(response, src);
} catch (Exception e) {
throw HyracksDataException.create(e);
}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockRequestMessage.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockRequestMessage.java
new file mode 100644
index 0000000..75b9881
--- /dev/null
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockRequestMessage.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.message;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class TxnIdBlockRequestMessage implements ICcAddressedMessage {
+ private static final Logger LOGGER = LogManager.getLogger();
+ private static final int BLOCK_SIZE = 100;
+ private static final long serialVersionUID = 1L;
+
+ private static BlockingQueue<TxnIdBlockRequestResponseMessage> blockQueue
= new LinkedBlockingQueue<>();
+ private final String nodeId;
+ private final int blockSizeRequested;
+
+ public TxnIdBlockRequestMessage(String nodeId, int blockSizeRequested) {
+ this.nodeId = nodeId;
+ this.blockSizeRequested = blockSizeRequested;
+ }
+
+ @Override
+ public void handle(ICcApplicationContext appCtx) throws
HyracksDataException {
+ try {
+ ICCMessageBroker broker = (ICCMessageBroker)
appCtx.getServiceContext().getMessageBroker();
+ long startingId =
appCtx.getTxnIdFactory().getIdBlock(blockSizeRequested);
+ TxnIdBlockRequestResponseMessage response =
+ new TxnIdBlockRequestResponseMessage(startingId,
blockSizeRequested);
+ broker.sendApplicationMessageToNC(response, nodeId);
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ public static Block send(INcApplicationContext ncs) throws
HyracksDataException {
+ TxnIdBlockRequestMessage blockRequestMessage =
+ new
TxnIdBlockRequestMessage(ncs.getServiceContext().getNodeId(), BLOCK_SIZE);
+ try {
+ ((INCMessageBroker)
ncs.getServiceContext().getMessageBroker()).sendMessageToPrimaryCC(blockRequestMessage);
+ TxnIdBlockRequestResponseMessage response = blockQueue.take();
+ return new Block(response.getStartingId(),
response.getBlockSize());
+ } catch (Exception e) {
+ LOGGER.log(Level.ERROR, "Unable to request transaction id block",
e);
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ static void addResponse(TxnIdBlockRequestResponseMessage response) {
+ blockQueue.offer(response);
+ }
+
+ @Override
+ public String toString() {
+ return TxnIdBlockRequestMessage.class.getSimpleName();
+ }
+
+ public static class Block {
+
+ private final long startingId;
+ private final int blockSize;
+
+ public Block(long startingId, int blockSize) {
+ this.startingId = startingId;
+ this.blockSize = blockSize;
+ }
+
+ public long getStartingId() {
+ return startingId;
+ }
+
+ public int getBlockSize() {
+ return blockSize;
+ }
+ }
+}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockRequestResponseMessage.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockRequestResponseMessage.java
new file mode 100644
index 0000000..ab01a35
--- /dev/null
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockRequestResponseMessage.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.message;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class TxnIdBlockRequestResponseMessage implements INcAddressedMessage {
+ private static final long serialVersionUID = 1L;
+ private final long startingId;
+ private final int blockSize;
+
+ public TxnIdBlockRequestResponseMessage(long startingId, int blockSize) {
+ this.startingId = startingId;
+ this.blockSize = blockSize;
+ }
+
+ public long getStartingId() {
+ return startingId;
+ }
+
+ public int getBlockSize() {
+ return blockSize;
+ }
+
+ @Override
+ public void handle(INcApplicationContext appCtx) throws
HyracksDataException, InterruptedException {
+ TxnIdBlockRequestMessage.addResponse(this);
+ }
+
+ @Override
+ public String toString() {
+ return TxnIdBlockRequestResponseMessage.class.getSimpleName();
+ }
+}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/BulkTxnIdFactory.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/BulkTxnIdFactory.java
similarity index 88%
rename from
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/BulkTxnIdFactory.java
rename to
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/BulkTxnIdFactory.java
index 8ac6b63..542bc17 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/BulkTxnIdFactory.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/BulkTxnIdFactory.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.asterix.metadata;
+package org.apache.asterix.runtime.utils;
import java.util.concurrent.atomic.AtomicLong;
@@ -33,7 +33,8 @@
return new TxnId(maxId.incrementAndGet());
}
- public long reserveIdBlock(int blockSize) {
+ @Override
+ public long getIdBlock(int blockSize) {
if (blockSize < 1) {
throw new IllegalArgumentException("block size cannot be smaller
than 1, but was " + blockSize);
}
@@ -44,4 +45,9 @@
public void ensureMinimumId(long id) {
this.maxId.getAndUpdate(next -> Math.max(next, id));
}
+
+ @Override
+ public long getMaxTxnId() {
+ return maxId.get();
+ }
}
\ No newline at end of file
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
index b83df6c..4157e16 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
@@ -24,7 +24,6 @@
import org.apache.asterix.common.api.ICoordinationService;
import org.apache.asterix.common.api.IMetadataLockManager;
import org.apache.asterix.common.api.INodeJobTracker;
-import org.apache.asterix.common.transactions.ILongBlockFactory;
import org.apache.asterix.common.transactions.ITxnIdFactory;
import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
@@ -93,8 +92,7 @@
ILibraryManager libraryManager, Supplier<IMetadataBootstrap>
metadataBootstrapSupplier,
IGlobalRecoveryManager globalRecoveryManager,
INcLifecycleCoordinator ftStrategy,
IJobLifecycleListener activeLifeCycleListener,
IStorageComponentProvider storageComponentProvider,
- IMetadataLockManager mdLockManager, Supplier<ILongBlockFactory>
txnIdBlockSupplier)
- throws AlgebricksException, IOException {
+ IMetadataLockManager mdLockManager) throws AlgebricksException,
IOException {
this.ccServiceCtx = ccServiceCtx;
this.hcc = hcc;
this.libraryManager = libraryManager;
@@ -122,7 +120,8 @@
clusterStateManager.setCcAppCtx(this);
this.resourceIdManager = new ResourceIdManager(clusterStateManager);
nodeJobTracker = new NodeJobTracker();
- txnIdFactory = new CcTxnIdFactory(txnIdBlockSupplier);
+ txnIdFactory = new BulkTxnIdFactory();
+
}
@Override
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
index 58b4b27..d0e4655 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
@@ -22,9 +22,6 @@
import java.io.Serializable;
import org.apache.hyracks.api.util.ErrorMessageUtil;
-import org.apache.logging.log4j.Level;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
/**
* The main execution time exception type for runtime errors in a hyracks
environment
@@ -32,7 +29,6 @@
public class HyracksDataException extends HyracksException {
private static final long serialVersionUID = 1L;
- private static final Logger LOGGER = LogManager.getLogger();
public static HyracksDataException create(Throwable cause) {
if (cause instanceof HyracksDataException || cause == null) {
@@ -40,11 +36,8 @@
} else if (cause instanceof Error) {
// don't wrap errors, allow them to propagate
throw (Error) cause;
- } else if (cause instanceof InterruptedException &&
!Thread.currentThread().isInterrupted()) {
- // TODO(mblow): why not force interrupt on current thread?
- LOGGER.log(Level.WARN,
- "Wrapping an InterruptedException in HyracksDataException
and current thread is not interrupted",
- cause);
+ } else if (cause instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
}
return new HyracksDataException(cause);
}
@@ -65,10 +58,8 @@
// don't suppress errors into a HyracksDataException, allow them
to propagate
th.addSuppressed(root);
throw (Error) th;
- } else if (th instanceof InterruptedException &&
!Thread.currentThread().isInterrupted()) {
- // TODO(mblow): why not force interrupt on current thread?
- LOGGER.log(Level.WARN, "Suppressing an InterruptedException in a
HyracksDataException and current "
- + "thread is not interrupted", th);
+ } else if (th instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
}
root.addSuppressed(th);
return root;
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
index a95ae3d..5e3c3d4 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
@@ -21,7 +21,6 @@
import java.util.List;
import org.apache.hyracks.api.comm.NetworkAddress;
-import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.dataflow.TaskAttemptId;
import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.deployment.DeploymentId;
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index 5c6d078..3d505f3 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -1314,6 +1314,7 @@
}
public static class ShutdownResponseFunction extends Function {
+ private static final long serialVersionUID = 1L;
private final String nodeId;
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index ae40ea3..027316e 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -21,7 +21,6 @@
import java.util.List;
import org.apache.hyracks.api.comm.NetworkAddress;
-import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.dataflow.TaskAttemptId;
import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.deployment.DeploymentId;
--
To view, visit https://asterix-gerrit.ics.uci.edu/2368
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I88f14fb351976db239ed752693e59882da62d588
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Michael Blow <[email protected]>