Murtadha Hubail has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/897
Change subject: Introduce MessagingNetworkManager for NC2NC AppMessaging
......................................................................
Introduce MessagingNetworkManager for NC2NC AppMessaging
This change introduces MessagingNetworkManager to NodeControllerService.
The MessagingNetworkManager is used to open channels that are used for NC2NC
application messaging. The messaging channels are passed to the application
through the IMessageBroker APIs and should be managed by the application itself.
Change-Id: I5c0bd7c11c1e78954ebceff49cb274d8073a64bd
---
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
A
asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingHelper.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
M
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeControllerInfo.java
R
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IBufferAcceptor.java
R
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IBufferFactory.java
C
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelControlBlock.java
R
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelReadInterface.java
R
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelWriteInterface.java
R
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/ICloseableBufferAcceptor.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/messages/IMessageBroker.java
M
hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/DatasetNetworkInputChannel.java
A
hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NcMessagingUtil.java
M
hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java
M
hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
M
hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ReadBufferFactory.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/DatasetNetworkManager.java
A
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/MessagingNetworkManager.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
M
hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/scheduler/SchedulerTest.java
M
hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/utils/TestUtils.java
M
hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs2/scheduler/SchedulerTest.java
M hyracks-fullstack/hyracks/hyracks-net/pom.xml
M
hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
M
hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/NetTest.java
33 files changed, 848 insertions(+), 200 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/97/897/1
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index cc50b75..0cc9570 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -82,6 +82,7 @@
ncConfig1.clusterNetIPAddress = "127.0.0.1";
ncConfig1.dataIPAddress = "127.0.0.1";
ncConfig1.resultIPAddress = "127.0.0.1";
+ ncConfig1.messagingIPAddress = "127.0.0.1";
ncConfig1.nodeId = ncName;
ncConfig1.resultTTL = 30000;
ncConfig1.resultSweepThreshold = 1000;
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
index 13b0189..836946a 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
@@ -18,8 +18,10 @@
*/
package org.apache.asterix.messaging;
+import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -44,40 +46,66 @@
import org.apache.asterix.event.schema.cluster.Node;
import org.apache.asterix.metadata.bootstrap.MetadataIndexImmutableProperties;
import
org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.hyracks.api.comm.IChannelControlBlock;
import org.apache.hyracks.api.messages.IMessage;
import org.apache.hyracks.api.util.JavaSerializationUtils;
import org.apache.hyracks.control.nc.NodeControllerService;
public class NCMessageBroker implements INCMessageBroker {
- private final static Logger LOGGER =
Logger.getLogger(NCMessageBroker.class.getName());
+ private static final Logger LOGGER =
Logger.getLogger(NCMessageBroker.class.getName());
private final NodeControllerService ncs;
private final AtomicLong messageId = new AtomicLong(0);
private final Map<Long, IApplicationMessageCallback> callbacks;
private final IAsterixAppRuntimeContext appContext;
+ private final Map<String, IChannelControlBlock> ncChannels = new
HashMap<>();
+ private final NCMessagingHelper messagingHelper;
+ private final LinkedBlockingQueue<IApplicationMessage> receivedMsgsQ;
+ private final MessageDeliveryService msgDeliverySvc;
public NCMessageBroker(NodeControllerService ncs) {
this.ncs = ncs;
appContext = (IAsterixAppRuntimeContext)
ncs.getApplicationContext().getApplicationObject();
- callbacks = new ConcurrentHashMap<Long, IApplicationMessageCallback>();
+ callbacks = new ConcurrentHashMap<>();
+ messagingHelper = new NCMessagingHelper(ncs, this);
+ receivedMsgsQ = new LinkedBlockingQueue<>();
+ msgDeliverySvc = new MessageDeliveryService();
+ appContext.getThreadExecutor().execute(msgDeliverySvc);
}
@Override
- public void sendMessage(IApplicationMessage message,
IApplicationMessageCallback callback) throws Exception {
- if (callback != null) {
- long uniqueMessageId = messageId.incrementAndGet();
- message.setId(uniqueMessageId);
- callbacks.put(uniqueMessageId, callback);
- }
+ public void sendMessageToCC(IApplicationMessage message,
IApplicationMessageCallback callback) throws Exception {
+ registerMsgCallback(message, callback);
try {
ncs.sendApplicationMessageToCC(JavaSerializationUtils.serialize(message), null);
} catch (Exception e) {
- if (callback != null) {
- //remove the callback in case of failure
- callbacks.remove(message.getId());
- }
+ handleMsgDeliveryFailure(message);
throw e;
}
+ }
+
+ @Override
+ public void sendMessageToNC(String nodeId, IApplicationMessage message,
IApplicationMessageCallback callback)
+ throws Exception {
+ registerMsgCallback(message, callback);
+ try {
+ IChannelControlBlock ccb = getNCChannel(nodeId);
+ messagingHelper.sendMessageToChannel(ccb, message);
+ } catch (Exception e) {
+ handleMsgDeliveryFailure(message);
+ throw e;
+ }
+ }
+
+ @Override
+ public void registerMessagingChannel(String nodeId, IChannelControlBlock
ccb) {
+ messagingHelper.configureChannel(ccb);
+ addOpenChannel(nodeId, ccb);
+ }
+
+ @Override
+ public void queueReceivedMessage(IApplicationMessage msg) {
+ receivedMsgsQ.offer(msg);
}
@Override
@@ -117,8 +145,54 @@
break;
}
} catch (Exception e) {
- e.printStackTrace();
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.log(Level.WARNING, e.getMessage(), e);
+ }
throw e;
+ }
+ }
+
+ private void registerMsgCallback(IApplicationMessage message,
IApplicationMessageCallback callback) {
+ if (callback != null) {
+ long uniqueMessageId = messageId.incrementAndGet();
+ message.setId(uniqueMessageId);
+ callbacks.put(uniqueMessageId, callback);
+ }
+ }
+
+ private void handleMsgDeliveryFailure(IApplicationMessage message) {
+ callbacks.remove(message.getId());
+ }
+
+ private IChannelControlBlock getNCChannel(String nodeId) throws Exception {
+ synchronized (ncChannels) {
+ // check if there is an existing open channel first
+ IChannelControlBlock ncChannel = ncChannels.get(nodeId);
+ if (ncChannels.get(nodeId) == null) {
+ //open a new channel and register it
+ ncChannel = messagingHelper.openChannel(nodeId);
+ registerMessagingChannel(nodeId, ncChannel);
+ }
+ return ncChannel;
+ }
+ }
+
+ private void addOpenChannel(String nodeId, IChannelControlBlock ccb) {
+ synchronized (ncChannels) {
+ if (ncChannels.get(nodeId) == null) {
+ ncChannels.put(nodeId, ccb);
+ } else {
+ //close this channel since another channel was opened with
this node
+ messagingHelper.closeChannel(ccb);
+ /**
+ * TODO Currently there is a chance that two nodes will open
+ * a channel to each other at the exact same time and both will
+ * end up using a half closed channel. While this isn't a big
issue,
+ * it should be eliminated by introducing negotiation protocol
+ * between nodes to decide which channel to use and which
channel
+ * to close fully.
+ */
+ }
}
}
@@ -133,7 +207,7 @@
//send response after takeover is completed
TakeoverPartitionsResponseMessage reponse = new
TakeoverPartitionsResponseMessage(msg.getRequestId(),
appContext.getTransactionSubsystem().getId(),
msg.getPartitions());
- sendMessage(reponse, null);
+ sendMessageToCC(reponse, null);
}
}
}
@@ -145,7 +219,7 @@
} finally {
TakeoverMetadataNodeResponseMessage reponse = new
TakeoverMetadataNodeResponseMessage(
appContext.getTransactionSubsystem().getId());
- sendMessage(reponse, null);
+ sendMessageToCC(reponse, null);
}
}
@@ -156,7 +230,7 @@
long maxResourceId =
Math.max(appContext.getLocalResourceRepository().getMaxResourceID(),
MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID);
maxResourceIdMsg.setMaxResourceId(maxResourceId);
- sendMessage(maxResourceIdMsg, null);
+ sendMessageToCC(maxResourceIdMsg, null);
}
private void handleReplicaEvent(IMessage message) {
@@ -198,7 +272,7 @@
//send response after partitions prepared for failback
PreparePartitionsFailbackResponseMessage reponse = new
PreparePartitionsFailbackResponseMessage(msg.getPlanId(),
msg.getRequestId(), msg.getPartitions());
- sendMessage(reponse, null);
+ sendMessageToCC(reponse, null);
}
private void handleCompleteFailbackRequest(IMessage message) throws
Exception {
@@ -209,7 +283,26 @@
} finally {
CompleteFailbackResponseMessage reponse = new
CompleteFailbackResponseMessage(msg.getPlanId(),
msg.getRequestId(), msg.getPartitions());
- sendMessage(reponse, null);
+ sendMessageToCC(reponse, null);
+ }
+ }
+
+ private class MessageDeliveryService implements Runnable {
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ IApplicationMessage msg = receivedMsgsQ.take();
+ //TODO add nodeId to IApplicationMessage and pass it
+ receivedMessage(msg, null);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (Exception e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.log(Level.WARNING, "Could not process message",
e);
+ }
+ }
+ }
}
}
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingHelper.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingHelper.java
new file mode 100644
index 0000000..a8908aa
--- /dev/null
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingHelper.java
@@ -0,0 +1,217 @@
+package org.apache.asterix.messaging;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.external.feed.dataflow.FrameAction;
+import org.apache.asterix.external.feed.management.ConcurrentFramePool;
+import org.apache.hyracks.api.client.NodeControllerInfo;
+import org.apache.hyracks.api.comm.IBufferAcceptor;
+import org.apache.hyracks.api.comm.IBufferFactory;
+import org.apache.hyracks.api.comm.IChannelControlBlock;
+import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
+import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.JavaSerializationUtils;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
+import org.apache.hyracks.util.StorageUtil;
+import org.apache.hyracks.util.StorageUtil.StorageUnit;
+
+public class NCMessagingHelper {
+
+ private static final Logger LOGGER =
Logger.getLogger(NCMessagingHelper.class.getName());
+ //TODO make these values configurable and account for their memory usage
+ private static final int MESSAGING_BUFFER_SIZE =
StorageUtil.getSizeInBytes(4, StorageUnit.KILOBYTE);
+ private static final int MESSAGING_MEMORY_BUDGET = 512 *
MESSAGING_BUFFER_SIZE;
+ private static final int MAX_MESSAGING_BUFFER_PER_CHANNEL = 16;
+
+ /**
+ * A buffer type to indicate whether an empty buffer was used during
+ * a write or read operation.
+ */
+ private enum AppMessagingEmptyBufferType {
+ READ_BUFFER,
+ WRITE_BUFFER
+ }
+
+ /* A single buffer factory used by all messaging channels */
+ private final IBufferFactory appMessagingBufferFactor = new
AppMessagingBufferFactory();
+
+ private final INCMessageBroker messageBroker;
+ private final ConcurrentFramePool framePool;
+ private final NodeControllerService ncs;
+
+ public NCMessagingHelper(NodeControllerService ncs, INCMessageBroker
messageBroker) {
+ this.ncs = ncs;
+ this.messageBroker = messageBroker;
+ framePool = new ConcurrentFramePool(ncs.getId(),
MESSAGING_MEMORY_BUDGET, MESSAGING_BUFFER_SIZE);
+ }
+
+ public IChannelControlBlock openChannel(String nodeId) throws Exception {
+ // Get active nodes info from CC
+ //TODO Don't send a request to CC for node info each time but only on
node membership changes.
+ Map<String, NodeControllerInfo> nodeControllers =
ncs.getNodeControllersInfo();
+
+ // Get the node messaging address from its info
+ NodeControllerInfo nodeControllerInfo = nodeControllers.get(nodeId);
+ if (nodeControllerInfo == null) {
+ throw new HyracksDataException("Could not find node: " + nodeId);
+ }
+ NetworkAddress nodeMessagingNeAddress =
nodeControllerInfo.getMessagingNetworkAddress();
+ SocketAddress nodeAddress = new
InetSocketAddress(InetAddress.getByName(nodeMessagingNeAddress.getAddress()),
+ nodeMessagingNeAddress.getPort());
+
+ // Open a channel using MessagingNetManager
+ IChannelControlBlock ccb =
ncs.getMessagingNetManager().connect(nodeAddress);
+ //Send the initial app messaging handshake message to register the
opened channel on both nodes
+ ByteBuffer initialBuffer =
ncs.getMessagingNetManager().constructAppMessagingInitialMessage(ncs.getId());
+ ccb.getWriteInterface().getFullBufferAcceptor().accept(initialBuffer);
+ return ccb;
+ }
+
+ public void configureChannel(IChannelControlBlock ccb) {
+ /**
+ * Set the app messaging buffer factory and acceptors
+ * to the channel read and write interfaces
+ */
+ ccb.getWriteInterface().setEmptyBufferAcceptor(
+ new AppMessagingEmptyBufferAcceptor(ccb,
AppMessagingEmptyBufferType.WRITE_BUFFER));
+ ccb.getReadInterface().setBufferFactory(appMessagingBufferFactor,
MAX_MESSAGING_BUFFER_PER_CHANNEL,
+ MESSAGING_BUFFER_SIZE);
+ AppMessagingEmptyBufferAcceptor readEmptyBufferAcceptor = new
AppMessagingEmptyBufferAcceptor(ccb,
+ AppMessagingEmptyBufferType.READ_BUFFER);
+ ccb.getReadInterface().setEmptyBufferAcceptor(readEmptyBufferAcceptor);
+ ccb.getReadInterface().setFullBufferAcceptor(new
AppMessagingReadFullBufferAcceptor(readEmptyBufferAcceptor));
+ }
+
+ public void sendMessageToChannel(IChannelControlBlock ccb,
IApplicationMessage msg) throws IOException {
+ byte[] serializedMsg = JavaSerializationUtils.serialize(msg);
+ if (serializedMsg.length > MESSAGING_BUFFER_SIZE) {
+ throw new HyracksDataException("Message exceded maximum size");
+ }
+ // Prepare the message buffer
+ ByteBuffer msgBuffer = framePool.get();
+ if (msgBuffer == null) {
+ throw new HyracksDataException("Could not get an empty buffer");
+ }
+ msgBuffer.clear();
+ msgBuffer.put(serializedMsg);
+ msgBuffer.flip();
+ // Give the buffer to the channel write interface for writing
+ ccb.getWriteInterface().getFullBufferAcceptor().accept(msgBuffer);
+ }
+
+ public void closeChannel(IChannelControlBlock ccb) {
+ /**
+ * Close the channel write interface which will result in
+ * sending a CLOSE_CHANNEL request to the remote node of
+ * the channel.
+ */
+ ccb.getWriteInterface().getFullBufferAcceptor().close();
+ }
+
+ /**
+ * A buffer acceptor that receives the read buffers containing messages
from
+ * other nodes.
+ */
+ private class AppMessagingReadFullBufferAcceptor implements
ICloseableBufferAcceptor {
+ private final IBufferAcceptor recycle;
+
+ private AppMessagingReadFullBufferAcceptor(IBufferAcceptor recycle) {
+ this.recycle = recycle;
+ }
+
+ @Override
+ public void accept(ByteBuffer buffer) {
+ try {
+ IApplicationMessage receivedMsg = (IApplicationMessage)
JavaSerializationUtils
+ .deserialize(buffer.array());
+ //queue the received message and free the network IO thread
+ messageBroker.queueReceivedMessage(receivedMsg);
+ } catch (ClassNotFoundException | IOException e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.log(Level.WARNING, e.getMessage(), e);
+ }
+ } finally {
+ recycle.accept(buffer);
+ }
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public void error(int ecode) {
+ }
+ }
+
+ /**
+ * Empty buffer acceptor used to return the used buffers in app messaging
+ * to the buffer pool.
+ */
+ private class AppMessagingEmptyBufferAcceptor implements IBufferAcceptor {
+ private final AppMessagingEmptyBufferType bufferType;
+ private final ChannelControlBlock ccb;
+
+ public AppMessagingEmptyBufferAcceptor(IChannelControlBlock ccb,
AppMessagingEmptyBufferType bufferType) {
+ this.ccb = (ChannelControlBlock) ccb;
+ this.bufferType = bufferType;
+ }
+
+ @Override
+ public void accept(ByteBuffer buffer) {
+ try {
+ int remaining = buffer.remaining();
+ if (bufferType == AppMessagingEmptyBufferType.READ_BUFFER) {
+ /**
+ * add the number of received bytes from this channel
+ * as write credits to make sure the messaging channel
+ * full write credit is always available.
+ */
+ ccb.addWriteCredits(remaining);
+ }
+ framePool.release(buffer);
+ } catch (HyracksDataException e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.log(Level.WARNING, e.getMessage(), e);
+ }
+ }
+ }
+ }
+
+ /**
+ * A buffer factory based on {@link ConcurrentFramePool}. Used
+ * for messaging channels buffers.
+ */
+ private final class AppMessagingBufferFactory implements IBufferFactory {
+ private final FrameAction frameAction = new FrameAction();
+
+ @Override
+ public ByteBuffer createBuffer() {
+ ByteBuffer buffer = framePool.get();
+ if (buffer == null) {
+ try {
+ framePool.subscribe(frameAction);
+ buffer = frameAction.retrieve();
+ } catch (HyracksDataException e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.log(Level.WARNING, e.getMessage(), e);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ return buffer;
+ }
+ }
+}
\ No newline at end of file
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
index fba74e8..bcf7a8c 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
@@ -43,7 +43,7 @@
/**
* Sets a unique message id that identifies this message within an NC.
- * This id is set by {@link
INCMessageBroker#sendMessage(IApplicationMessage, IApplicationMessageCallback)}
+ * This id is set by {@link
INCMessageBroker#sendMessageToCC(IApplicationMessage,
IApplicationMessageCallback)}
* when the callback is not null to notify the sender when the response to
that message is received.
*
* @param messageId
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
index 41f8a0c..7628167 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
@@ -29,7 +29,17 @@
* @param callback
* @throws Exception
*/
- public void sendMessage(IApplicationMessage message,
IApplicationMessageCallback callback) throws Exception;
+ public void sendMessageToCC(IApplicationMessage message,
IApplicationMessageCallback callback) throws Exception;
+
+ /**
+ * Sends application message from this NC to another NC.
+ *
+ * @param message
+ * @param callback
+ * @throws Exception
+ */
+ public void sendMessageToNC(String nodeId, IApplicationMessage message,
IApplicationMessageCallback callback)
+ throws Exception;
/**
* Sends the maximum resource id on this NC to the CC.
@@ -37,4 +47,11 @@
* @throws Exception
*/
public void reportMaxResourceId() throws Exception;
+
+ /**
+ * Queue a message to this {@link INCMessageBroker} for processing
+ *
+ * @param msg
+ */
+ public void queueReceivedMessage(IApplicationMessage msg);
}
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java
index 5b29530..ab1ebe1 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java
@@ -57,7 +57,7 @@
//if no response available or it has an exception, request a new
one
if (reponse == null || reponse.getException() != null) {
ResourceIdRequestMessage msg = new ResourceIdRequestMessage();
- ((INCMessageBroker)
appCtx.getMessageBroker()).sendMessage(msg, this);
+ ((INCMessageBroker)
appCtx.getMessageBroker()).sendMessageToCC(msg, this);
reponse = (ResourceIdRequestResponseMessage)
resourceIdResponseQ.take();
if (reponse.getException() != null) {
throw new
HyracksDataException(reponse.getException().getMessage());
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeControllerInfo.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeControllerInfo.java
index c41dafe..a79b955 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeControllerInfo.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeControllerInfo.java
@@ -33,12 +33,15 @@
private final NetworkAddress datasetNetworkAddress;
+ private final NetworkAddress messagingNetworkAddress;
+
public NodeControllerInfo(String nodeId, NodeStatus status, NetworkAddress
netAddress,
- NetworkAddress datasetNetworkAddress) {
+ NetworkAddress datasetNetworkAddress, NetworkAddress
messagingNetworkAddress) {
this.nodeId = nodeId;
this.status = status;
this.netAddress = netAddress;
this.datasetNetworkAddress = datasetNetworkAddress;
+ this.messagingNetworkAddress = messagingNetworkAddress;
}
public String getNodeId() {
@@ -56,4 +59,8 @@
public NetworkAddress getDatasetNetworkAddress() {
return datasetNetworkAddress;
}
+
+ public NetworkAddress getMessagingNetworkAddress() {
+ return messagingNetworkAddress;
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/buffers/IBufferAcceptor.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IBufferAcceptor.java
similarity index 95%
rename from
hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/buffers/IBufferAcceptor.java
rename to
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IBufferAcceptor.java
index d7febd2..d69ddd7 100644
---
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/buffers/IBufferAcceptor.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IBufferAcceptor.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.hyracks.net.buffers;
+package org.apache.hyracks.api.comm;
import java.nio.ByteBuffer;
diff --git
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/IBufferFactory.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IBufferFactory.java
similarity index 94%
rename from
hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/IBufferFactory.java
rename to
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IBufferFactory.java
index 53299d8..5188a43 100644
---
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/IBufferFactory.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IBufferFactory.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.hyracks.net.protocols.muxdemux;
+package org.apache.hyracks.api.comm;
import java.nio.ByteBuffer;
diff --git
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/buffers/ICloseableBufferAcceptor.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelControlBlock.java
similarity index 62%
copy from
hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/buffers/ICloseableBufferAcceptor.java
copy to
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelControlBlock.java
index 1697cfd..4b327b3 100644
---
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/buffers/ICloseableBufferAcceptor.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelControlBlock.java
@@ -16,25 +16,30 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.hyracks.net.buffers;
+package org.apache.hyracks.api.comm;
-/**
- * A buffer acceptor that can be closed to indicate end of transmission or an
error code
- * specified to indicate an error in transmission.
- *
- * @author vinayakb
- */
-public interface ICloseableBufferAcceptor extends IBufferAcceptor {
- /**
- * Close the buffer acceptor.
- */
- public void close();
+public interface IChannelControlBlock {
/**
- * Indicate that an error occurred.
+ * Get the read interface of this channel.
*
- * @param ecode
- * - the error code.
+ * @return the read interface.
*/
- public void error(int ecode);
+ IChannelReadInterface getReadInterface();
+
+ /**
+ * Get the write interface of this channel.
+ *
+ * @return the write interface.
+ */
+ IChannelWriteInterface getWriteInterface();
+
+ /**
+ * Add write credit to this channel.
+ *
+ * @param delta
+ * number of bytes
+ */
+ public void addWriteCredits(int delta);
+
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/IChannelReadInterface.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelReadInterface.java
similarity index 77%
rename from
hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/IChannelReadInterface.java
rename to
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelReadInterface.java
index 8639fb7..86189f1 100644
---
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/IChannelReadInterface.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelReadInterface.java
@@ -16,13 +16,10 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.hyracks.net.protocols.muxdemux;
-
-import org.apache.hyracks.net.buffers.IBufferAcceptor;
-import org.apache.hyracks.net.buffers.ICloseableBufferAcceptor;
+package org.apache.hyracks.api.comm;
/**
- * Represents the read interface of a {@link ChannelControlBlock}.
+ * Represents the read interface of a {@link IChannelControlBlock}.
*
* @author vinayakb
*/
@@ -56,4 +53,19 @@
* - the size of each buffer
*/
public void setBufferFactory(IBufferFactory bufferFactory, int limit, int
frameSize);
+
+ /**
+ * Set a flag to indicate whether to flush the current read buffer
+ * on complete reads or wait until the buffer is completely filled
+ *
+ * @param flushOnCompleteRead
+ */
+ public void setFlushOnCompleteRead(boolean flushOnCompleteRead);
+
+ /**
+ * Overrides the default empty buffer acceptor of this {@link
IChannelReadInterface}
+ *
+ * @param eba
+ */
+ public void setEmptyBufferAcceptor(IBufferAcceptor eba);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/IChannelWriteInterface.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelWriteInterface.java
similarity index 91%
rename from
hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/IChannelWriteInterface.java
rename to
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelWriteInterface.java
index dc38ea4..6c4ef0e 100644
---
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/IChannelWriteInterface.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelWriteInterface.java
@@ -16,10 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.hyracks.net.protocols.muxdemux;
-
-import org.apache.hyracks.net.buffers.IBufferAcceptor;
-import org.apache.hyracks.net.buffers.ICloseableBufferAcceptor;
+package org.apache.hyracks.api.comm;
/**
* Represents the write interface of a {@link ChannelControlBlock}.
diff --git
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/buffers/ICloseableBufferAcceptor.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/ICloseableBufferAcceptor.java
similarity index 96%
rename from
hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/buffers/ICloseableBufferAcceptor.java
rename to
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/ICloseableBufferAcceptor.java
index 1697cfd..fba0eaf 100644
---
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/buffers/ICloseableBufferAcceptor.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/ICloseableBufferAcceptor.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.hyracks.net.buffers;
+package org.apache.hyracks.api.comm;
/**
* A buffer acceptor that can be closed to indicate end of transmission or an
error code
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/messages/IMessageBroker.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/messages/IMessageBroker.java
index 2b166d2..7ba9581 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/messages/IMessageBroker.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/messages/IMessageBroker.java
@@ -18,11 +18,13 @@
*/
package org.apache.hyracks.api.messages;
-/**
- * @author rico
- */
+import org.apache.hyracks.api.comm.IChannelControlBlock;
+
public interface IMessageBroker {
public void receivedMessage(IMessage message, String nodeId) throws
Exception;
+ public default void registerMessagingChannel(String nodeId,
IChannelControlBlock ccb) {
+ throw new UnsupportedOperationException();
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/DatasetNetworkInputChannel.java
b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/DatasetNetworkInputChannel.java
index fba7cf5..8f72bb6 100644
---
a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/DatasetNetworkInputChannel.java
+++
b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/DatasetNetworkInputChannel.java
@@ -27,12 +27,12 @@
import org.apache.hyracks.api.channels.IInputChannel;
import org.apache.hyracks.api.channels.IInputChannelMonitor;
+import org.apache.hyracks.api.comm.IBufferAcceptor;
+import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
import org.apache.hyracks.api.context.IHyracksCommonContext;
import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.net.buffers.IBufferAcceptor;
-import org.apache.hyracks.net.buffers.ICloseableBufferAcceptor;
import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
public class DatasetNetworkInputChannel implements IInputChannel {
diff --git
a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NcMessagingUtil.java
b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NcMessagingUtil.java
new file mode 100644
index 0000000..17ae249
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NcMessagingUtil.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.comm.channels;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.partitions.PartitionId;
+import org.apache.hyracks.util.StorageUtil;
+import org.apache.hyracks.util.StorageUtil.StorageUnit;
+
+public class NcMessagingUtil {
+
+ public static final int MAX_INITIAL_MESSAGE_SIZE =
StorageUtil.getSizeInBytes(2, StorageUnit.KILOBYTE);
+
+ private NcMessagingUtil() {
+ throw new AssertionError("This util class should not be initialized");
+ }
+
+ public enum ChannelType {
+ DATA_PARTITION((byte) 0, 21),
+ APP_MESSAGING((byte) 1, 5);
+
+ private static final Map<Byte, ChannelType> TYPES =
Collections.unmodifiableMap(initializeMapping());
+ private final int initialMessageBaseSize;
+ private final byte type;
+
+ ChannelType(byte type, int initialMessageBaseSize) {
+ this.type = type;
+ this.initialMessageBaseSize = initialMessageBaseSize;
+ }
+
+ public byte getType() {
+ return type;
+ }
+
+ public int getInitialMessageBaseSize() {
+ return initialMessageBaseSize;
+ }
+
+ public static ChannelType getChannelType(byte type) {
+ return TYPES.get(type);
+ }
+
+ private static Map<Byte, ChannelType> initializeMapping() {
+ Map<Byte, ChannelType> mapping = new HashMap<>();
+ for (ChannelType s : ChannelType.values()) {
+ mapping.put(s.getType(), s);
+ }
+ return mapping;
+ }
+ }
+
+ public static ByteBuffer getDataPartitionChannelInitialBuffer(PartitionId
partitionId) {
+ ByteBuffer initialBuffer =
ByteBuffer.allocate(ChannelType.DATA_PARTITION.getInitialMessageBaseSize());
+ initialBuffer.put(ChannelType.DATA_PARTITION.getType());
+ initialBuffer.putLong(partitionId.getJobId().getId());
+ initialBuffer.putInt(partitionId.getConnectorDescriptorId().getId());
+ initialBuffer.putInt(partitionId.getSenderIndex());
+ initialBuffer.putInt(partitionId.getReceiverIndex());
+ initialBuffer.flip();
+ return initialBuffer;
+ }
+
+ public static ByteBuffer getMessagingChannelInitialBuffer(String nodeId) {
+ ByteBuffer initialBuffer = ByteBuffer
+
.allocate(ChannelType.APP_MESSAGING.getInitialMessageBaseSize() +
nodeId.length());
+ initialBuffer.put(ChannelType.APP_MESSAGING.getType());
+ initialBuffer.putInt(nodeId.length());
+ initialBuffer.put(nodeId.getBytes());
+ initialBuffer.flip();
+ return initialBuffer;
+ }
+
+ public static PartitionId readDataPartitionInitialMessage(ByteBuffer
buffer) {
+ JobId jobId = new JobId(buffer.getLong());
+ ConnectorDescriptorId cdid = new
ConnectorDescriptorId(buffer.getInt());
+ int senderIndex = buffer.getInt();
+ int receiverIndex = buffer.getInt();
+ return new PartitionId(jobId, cdid, senderIndex, receiverIndex);
+ }
+
+ public static String readAppMessagingInitialMessage(ByteBuffer buffer) {
+ int nodeIdLength = buffer.getInt();
+ byte[] stringBytes = new byte[nodeIdLength];
+ buffer.get(stringBytes);
+ return new String(stringBytes);
+ }
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java
b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java
index 235536f..7938fc1 100644
---
a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java
+++
b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java
@@ -27,11 +27,11 @@
import org.apache.hyracks.api.channels.IInputChannel;
import org.apache.hyracks.api.channels.IInputChannelMonitor;
+import org.apache.hyracks.api.comm.IBufferAcceptor;
+import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
import org.apache.hyracks.api.context.IHyracksCommonContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.partitions.PartitionId;
-import org.apache.hyracks.net.buffers.IBufferAcceptor;
-import org.apache.hyracks.net.buffers.ICloseableBufferAcceptor;
import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
public class NetworkInputChannel implements IInputChannel {
diff --git
a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
index 46220de..60e2e35 100644
---
a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
+++
b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
@@ -22,9 +22,9 @@
import java.util.ArrayDeque;
import java.util.Deque;
+import org.apache.hyracks.api.comm.IBufferAcceptor;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.net.buffers.IBufferAcceptor;
import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
public class NetworkOutputChannel implements IFrameWriter {
diff --git
a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ReadBufferFactory.java
b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ReadBufferFactory.java
index 3aa77b9..7628cd2 100644
---
a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ReadBufferFactory.java
+++
b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ReadBufferFactory.java
@@ -20,8 +20,8 @@
import java.nio.ByteBuffer;
+import org.apache.hyracks.api.comm.IBufferFactory;
import org.apache.hyracks.api.context.IHyracksCommonContext;
-import org.apache.hyracks.net.protocols.muxdemux.IBufferFactory;
/**
* @author yingyib
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
index d93ba0d..ff5832a 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
@@ -23,10 +23,6 @@
import java.util.Map;
import java.util.Set;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-
import org.apache.hyracks.api.comm.NetworkAddress;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.control.common.base.INodeController;
@@ -35,6 +31,9 @@
import org.apache.hyracks.control.common.heartbeat.HeartbeatData;
import org.apache.hyracks.control.common.heartbeat.HeartbeatSchema;
import
org.apache.hyracks.control.common.heartbeat.HeartbeatSchema.GarbageCollectorInfo;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
public class NodeControllerState {
private static final int RRD_SIZE = 720;
@@ -46,6 +45,8 @@
private final NetworkAddress dataPort;
private final NetworkAddress datasetPort;
+
+ private final NetworkAddress messagingPort;
private final Set<JobId> activeJobIds;
@@ -142,6 +143,7 @@
ncConfig = reg.getNCConfig();
dataPort = reg.getDataPort();
datasetPort = reg.getDatasetPort();
+ messagingPort = reg.getMessagingPort();
activeJobIds = new HashSet<JobId>();
osName = reg.getOSName();
@@ -264,6 +266,10 @@
return datasetPort;
}
+ public NetworkAddress getMessagingPort() {
+ return messagingPort;
+ }
+
public JSONObject toSummaryJSON() throws JSONException {
JSONObject o = new JSONObject();
o.put("node-id", ncConfig.nodeId);
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java
index 785a202..726bf12 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java
@@ -40,11 +40,11 @@
@Override
public void run() {
- Map<String, NodeControllerInfo> result = new LinkedHashMap<String,
NodeControllerInfo>();
+ Map<String, NodeControllerInfo> result = new LinkedHashMap<>();
Map<String, NodeControllerState> nodeMap = ccs.getNodeMap();
for (Map.Entry<String, NodeControllerState> e : nodeMap.entrySet()) {
- result.put(e.getKey(), new NodeControllerInfo(e.getKey(),
NodeStatus.ALIVE, e.getValue().getDataPort(), e
- .getValue().getDatasetPort()));
+ result.put(e.getKey(), new NodeControllerInfo(e.getKey(),
NodeStatus.ALIVE, e.getValue().getDataPort(),
+ e.getValue().getDatasetPort(),
e.getValue().getMessagingPort()));
}
callback.setValue(result);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
index b408083..6eb5264 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
@@ -18,18 +18,18 @@
*/
package org.apache.hyracks.control.common.controllers;
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.util.List;
+import java.util.Map;
+
import org.apache.hyracks.api.application.IApplicationConfig;
import org.apache.hyracks.control.common.application.IniApplicationConfig;
import org.ini4j.Ini;
import org.kohsuke.args4j.Argument;
import org.kohsuke.args4j.Option;
import org.kohsuke.args4j.spi.StopOptionHandler;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.net.InetAddress;
-import java.util.List;
-import java.util.Map;
public class NCConfig implements Serializable {
private static final long serialVersionUID = 2L;
@@ -82,7 +82,7 @@
@Option(name = "-result-public-port", usage = "Public IP port to announce
dataset result distribution listener (default: same as -result-port; must set
-result-public-ip-address also)", required = false)
public int resultPublicPort = 0;
- @Option(name = "-retries", usage ="Number of attempts to contact CC before
giving up (default = 5)")
+ @Option(name = "-retries", usage = "Number of attempts to contact CC
before giving up (default = 5)")
public int retries = 5;
@Option(name = "-iodevices", usage = "Comma separated list of IO Device
mount points (default: One device in default temp folder)", required = false)
@@ -112,6 +112,19 @@
@Option(name = "-config-file", usage = "Specify path to local
configuration file (default: no local config)", required = false)
public String configFile = null;
+ //TODO add messaging values to NC start scripts
+ @Option(name = "-messaging-ip-address", usage = "IP Address to bind
messaging listener (default: same as -address)", required = false)
+ public String messagingIPAddress;
+
+ @Option(name = "-messaging-port", usage = "IP port to bind messaging
listener (default: random port)", required = false)
+ public int messagingPort = 0;
+
+ @Option(name = "-messaging-public-ip-address", usage = "Public IP Address
to announce messaging listener (default: same as -messaging-ip-address)",
required = false)
+ public String messagingPublicIPAddress;
+
+ @Option(name = "-messaging-public-port", usage = "Public IP port to
announce messaging listener (default: same as -messaging-port; must set
-messaging-public-port also)", required = false)
+ public int messagingPublicPort = 0;
+
@Argument
@Option(name = "--", handler = StopOptionHandler.class)
public List<String> appArgs;
@@ -139,13 +152,14 @@
resultIPAddress = IniUtils.getString(ini, nodeSection,
"result.address", resultIPAddress);
resultPort = IniUtils.getInt(ini, nodeSection, "result.port",
resultPort);
- clusterNetPublicIPAddress = IniUtils.getString(
- ini, nodeSection, "public.cluster.address",
clusterNetPublicIPAddress);
+ clusterNetPublicIPAddress = IniUtils.getString(ini, nodeSection,
"public.cluster.address",
+ clusterNetPublicIPAddress);
clusterNetPublicPort = IniUtils.getInt(ini, nodeSection,
"public.cluster.port", clusterNetPublicPort);
dataPublicIPAddress = IniUtils.getString(ini, nodeSection,
"public.data.address", dataPublicIPAddress);
dataPublicPort = IniUtils.getInt(ini, nodeSection, "public.data.port",
dataPublicPort);
resultPublicIPAddress = IniUtils.getString(ini, nodeSection,
"public.result.address", resultPublicIPAddress);
resultPublicPort = IniUtils.getInt(ini, nodeSection,
"public.result.port", resultPublicPort);
+ //TODO pass messaging info from ini file
retries = IniUtils.getInt(ini, nodeSection, "retries", retries);
@@ -169,23 +183,32 @@
}
// "address" is the default for all IP addresses
- if (clusterNetIPAddress == null) clusterNetIPAddress = ipAddress;
- if (dataIPAddress == null) dataIPAddress = ipAddress;
- if (resultIPAddress == null) resultIPAddress = ipAddress;
+ if (clusterNetIPAddress == null)
+ clusterNetIPAddress = ipAddress;
+ if (dataIPAddress == null)
+ dataIPAddress = ipAddress;
+ if (resultIPAddress == null)
+ resultIPAddress = ipAddress;
// All "public" options default to their "non-public" versions
- if (clusterNetPublicIPAddress == null) clusterNetPublicIPAddress =
clusterNetIPAddress;
- if (clusterNetPublicPort == 0) clusterNetPublicPort = clusterNetPort;
- if (dataPublicIPAddress == null) dataPublicIPAddress = dataIPAddress;
- if (dataPublicPort == 0) dataPublicPort = dataPort;
- if (resultPublicIPAddress == null) resultPublicIPAddress =
resultIPAddress;
- if (resultPublicPort == 0) resultPublicPort = resultPort;
+ if (clusterNetPublicIPAddress == null)
+ clusterNetPublicIPAddress = clusterNetIPAddress;
+ if (clusterNetPublicPort == 0)
+ clusterNetPublicPort = clusterNetPort;
+ if (dataPublicIPAddress == null)
+ dataPublicIPAddress = dataIPAddress;
+ if (dataPublicPort == 0)
+ dataPublicPort = dataPort;
+ if (resultPublicIPAddress == null)
+ resultPublicIPAddress = resultIPAddress;
+ if (resultPublicPort == 0)
+ resultPublicPort = resultPort;
}
/**
* @return An IApplicationConfig representing this NCConfig.
- * Note: Currently this only includes the values from the configuration
- * file, not anything specified on the command-line. QQQ
+ * Note: Currently this only includes the values from the
configuration
+ * file, not anything specified on the command-line. QQQ
*/
public IApplicationConfig getAppConfig() {
return new IniApplicationConfig(ini);
@@ -222,6 +245,14 @@
cList.add(resultPublicIPAddress);
cList.add("-result-public-port");
cList.add(String.valueOf(resultPublicPort));
+ cList.add("-messaging-ip-address");
+ cList.add(messagingIPAddress);
+ cList.add("-messaging-port");
+ cList.add(String.valueOf(messagingPort));
+ cList.add("-messaging-public-ip-address");
+ cList.add(messagingPublicIPAddress);
+ cList.add("-messaging-public-port");
+ cList.add(String.valueOf(messagingPublicPort));
cList.add("-retries");
cList.add(String.valueOf(retries));
cList.add("-iodevices");
@@ -275,7 +306,10 @@
configuration.put("result-time-to-live", String.valueOf(resultTTL));
configuration.put("result-sweep-threshold",
String.valueOf(resultSweepThreshold));
configuration.put("result-manager-memory",
String.valueOf(resultManagerMemory));
-
+ configuration.put("messaging-ip-address", messagingIPAddress);
+ configuration.put("messaging-port", String.valueOf(messagingPort));
+ configuration.put("messaging-public-ip-address",
messagingPublicIPAddress);
+ configuration.put("messaging-public-port",
String.valueOf(messagingPublicPort));
if (appNCMainClass != null) {
configuration.put("app-nc-main-class", appNCMainClass);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
index 5a23455..bb8022e 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
@@ -65,10 +65,13 @@
private final HeartbeatSchema hbSchema;
+ private final NetworkAddress messagingPort;
+
public NodeRegistration(InetSocketAddress ncAddress, String nodeId,
NCConfig ncConfig, NetworkAddress dataPort,
NetworkAddress datasetPort, String osName, String arch, String
osVersion, int nProcessors, String vmName,
String vmVersion, String vmVendor, String classpath, String
libraryPath, String bootClasspath,
- List<String> inputArguments, Map<String, String> systemProperties,
HeartbeatSchema hbSchema) {
+ List<String> inputArguments, Map<String, String> systemProperties,
HeartbeatSchema hbSchema,
+ NetworkAddress messagingPort) {
this.ncAddress = ncAddress;
this.nodeId = nodeId;
this.ncConfig = ncConfig;
@@ -87,6 +90,7 @@
this.inputArguments = inputArguments;
this.systemProperties = systemProperties;
this.hbSchema = hbSchema;
+ this.messagingPort = messagingPort;
}
public InetSocketAddress getNodeControllerAddress() {
@@ -160,4 +164,8 @@
public Map<String, String> getSystemProperties() {
return systemProperties;
}
+
+ public NetworkAddress getMessagingPort() {
+ return messagingPort;
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 2f8def1..620f3ca 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -73,6 +73,7 @@
import org.apache.hyracks.control.nc.io.profiling.IIOCounter;
import org.apache.hyracks.control.nc.io.profiling.IOCounterFactory;
import org.apache.hyracks.control.nc.net.DatasetNetworkManager;
+import org.apache.hyracks.control.nc.net.MessagingNetworkManager;
import org.apache.hyracks.control.nc.net.NetworkManager;
import org.apache.hyracks.control.nc.partitions.PartitionManager;
import org.apache.hyracks.control.nc.resources.memory.MemoryManager;
@@ -157,6 +158,8 @@
private boolean shuttedDown = false;
private IIOCounter ioCounter;
+
+ private MessagingNetworkManager messagingNetManager;
public NodeControllerService(NCConfig ncConfig) throws Exception {
this.ncConfig = ncConfig;
@@ -248,6 +251,9 @@
datasetNetworkManager = new
DatasetNetworkManager(ncConfig.resultIPAddress, ncConfig.resultPort,
datasetPartitionManager, ncConfig.nNetThreads,
ncConfig.nNetBuffers, ncConfig.resultPublicIPAddress,
ncConfig.resultPublicPort);
+ messagingNetManager = new
MessagingNetworkManager(appCtx.getMessageBroker(), ncConfig.messagingIPAddress,
+ ncConfig.messagingPort, ncConfig.nNetThreads,
ncConfig.messagingPublicIPAddress,
+ ncConfig.messagingPublicPort);
}
@Override
@@ -260,7 +266,9 @@
init();
datasetNetworkManager.start();
- IIPCHandle ccIPCHandle = ipc.getHandle(new
InetSocketAddress(ncConfig.ccHost, ncConfig.ccPort), ncConfig.retries);
+ messagingNetManager.start();
+ IIPCHandle ccIPCHandle = ipc.getHandle(new
InetSocketAddress(ncConfig.ccHost, ncConfig.ccPort),
+ ncConfig.retries);
this.ccs = new ClusterControllerRemoteProxy(ccIPCHandle);
HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new
HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
for (int i = 0; i < gcInfos.length; ++i) {
@@ -270,12 +278,12 @@
// Use "public" versions of network addresses and ports
NetworkAddress datasetAddress =
datasetNetworkManager.getPublicNetworkAddress();
NetworkAddress netAddress = netManager.getPublicNetworkAddress();
- ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id,
ncConfig, netAddress,
- datasetAddress, osMXBean.getName(), osMXBean.getArch(),
osMXBean
- .getVersion(), osMXBean.getAvailableProcessors(),
runtimeMXBean.getVmName(), runtimeMXBean
- .getVmVersion(), runtimeMXBean.getVmVendor(),
runtimeMXBean.getClassPath(), runtimeMXBean
- .getLibraryPath(), runtimeMXBean.getBootClassPath(),
runtimeMXBean.getInputArguments(),
- runtimeMXBean.getSystemProperties(), hbSchema));
+ NetworkAddress meesagingPort =
messagingNetManager.getPublicNetworkAddress();
+ ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id,
ncConfig, netAddress, datasetAddress,
+ osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(),
osMXBean.getAvailableProcessors(),
+ runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(),
runtimeMXBean.getVmVendor(),
+ runtimeMXBean.getClassPath(), runtimeMXBean.getLibraryPath(),
runtimeMXBean.getBootClassPath(),
+ runtimeMXBean.getInputArguments(),
runtimeMXBean.getSystemProperties(), hbSchema, meesagingPort));
synchronized (this) {
while (registrationPending) {
@@ -569,6 +577,10 @@
return datasetPartitionManager;
}
+ public MessagingNetworkManager getMessagingNetManager() {
+ return messagingNetManager;
+ }
+
/**
* Shutdown hook that invokes {@link NCApplicationEntryPoint#stop() stop}
method.
*/
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/DatasetNetworkManager.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/DatasetNetworkManager.java
index 3c99ef3..a7c5f78 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/DatasetNetworkManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/DatasetNetworkManager.java
@@ -25,6 +25,7 @@
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
import org.apache.hyracks.api.comm.NetworkAddress;
import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
import org.apache.hyracks.api.dataset.ResultSetId;
@@ -32,7 +33,6 @@
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.comm.channels.IChannelConnectionFactory;
import org.apache.hyracks.comm.channels.NetworkOutputChannel;
-import org.apache.hyracks.net.buffers.ICloseableBufferAcceptor;
import org.apache.hyracks.net.exceptions.NetException;
import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
import org.apache.hyracks.net.protocols.muxdemux.IChannelOpenListener;
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/MessagingNetworkManager.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/MessagingNetworkManager.java
new file mode 100644
index 0000000..a04a939
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/MessagingNetworkManager.java
@@ -0,0 +1,138 @@
+package org.apache.hyracks.control.nc.net;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
+import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.messages.IMessageBroker;
+import org.apache.hyracks.comm.channels.IChannelConnectionFactory;
+import org.apache.hyracks.net.exceptions.NetException;
+import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
+import org.apache.hyracks.net.protocols.muxdemux.IChannelOpenListener;
+import org.apache.hyracks.net.protocols.muxdemux.MultiplexedConnection;
+import org.apache.hyracks.net.protocols.muxdemux.MuxDemux;
+import org.apache.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
+import org.apache.hyracks.util.StorageUtil;
+import org.apache.hyracks.util.StorageUtil.StorageUnit;
+
+public class MessagingNetworkManager implements IChannelConnectionFactory {
+
+ public static final int MAX_INITIAL_MESSAGE_SIZE =
StorageUtil.getSizeInBytes(2, StorageUnit.KILOBYTE);
+ private static final Logger LOGGER =
Logger.getLogger(MessagingNetworkManager.class.getName());
+ private static final int MAX_CONNECTION_ATTEMPTS = 5;
+ private final MuxDemux md;
+ private final IMessageBroker messageBroker;
+ private NetworkAddress localNetworkAddress;
+ private NetworkAddress publicNetworkAddress;
+
+ public MessagingNetworkManager(IMessageBroker messageBroker, String
inetAddress, int inetPort, int nThreads,
+ String publicInetAddress, int publicInetPort) {
+ this.messageBroker = messageBroker;
+ md = new MuxDemux(new InetSocketAddress(inetAddress, inetPort), new
ChannelOpenListener(), nThreads,
+ MAX_CONNECTION_ATTEMPTS);
+ publicNetworkAddress = new NetworkAddress(publicInetAddress,
publicInetPort);
+ }
+
+ public void start() throws IOException {
+ md.start();
+ InetSocketAddress sockAddr = md.getLocalAddress();
+ localNetworkAddress = new NetworkAddress(sockAddr.getHostString(),
sockAddr.getPort());
+
+ // See if the public address was explicitly specified, and if not,
+ // make it a copy of localNetworkAddress
+ if (publicNetworkAddress.getAddress() == null) {
+ publicNetworkAddress = localNetworkAddress;
+ } else {
+ // Likewise for public port
+ if (publicNetworkAddress.getPort() == 0) {
+ publicNetworkAddress = new
NetworkAddress(publicNetworkAddress.getAddress(), sockAddr.getPort());
+ }
+ }
+ }
+
+ public void stop() {
+
+ }
+
+ @Override
+ public ChannelControlBlock connect(SocketAddress remoteAddress) throws
InterruptedException, NetException {
+ MultiplexedConnection mConn = md.connect((InetSocketAddress)
remoteAddress);
+ ChannelControlBlock openChannel = mConn.openChannel();
+ /**
+ * since this is a messaging channel, it might not send complete
frames.
+ * Therefore, make it flush on every read message.
+ */
+ openChannel.getReadInterface().setFlushOnCompleteRead(true);
+ return openChannel;
+ }
+
+ public MuxDemuxPerformanceCounters getPerformanceCounters() {
+ return md.getPerformanceCounters();
+ }
+
+ public NetworkAddress getPublicNetworkAddress() {
+ return publicNetworkAddress;
+ }
+
+ public ByteBuffer constructAppMessagingInitialMessage(String ncId) throws
HyracksDataException {
+ int intialMsgLength = Integer.BYTES + ncId.length();
+ if (intialMsgLength > MAX_INITIAL_MESSAGE_SIZE) {
+ throw new HyracksDataException(
+ "Initial message exceded maximum size of " +
MAX_INITIAL_MESSAGE_SIZE + " bytes");
+ }
+ ByteBuffer initialBuffer = ByteBuffer.allocate(intialMsgLength);
+ initialBuffer.putInt(ncId.length());
+ initialBuffer.put(ncId.getBytes());
+ initialBuffer.flip();
+ return initialBuffer;
+ }
+
+ private class ChannelOpenListener implements IChannelOpenListener {
+ @Override
+ public void channelOpened(ChannelControlBlock channel) {
+ channel.getReadInterface().setFlushOnCompleteRead(true);
+ channel.getReadInterface().setFullBufferAcceptor(new
InitialBufferAcceptor(channel));
+
channel.getReadInterface().getEmptyBufferAcceptor().accept(ByteBuffer.allocate(MAX_INITIAL_MESSAGE_SIZE));
+ }
+ }
+
+ private class InitialBufferAcceptor implements ICloseableBufferAcceptor {
+ private final ChannelControlBlock ccb;
+
+ public InitialBufferAcceptor(ChannelControlBlock ccb) {
+ this.ccb = ccb;
+ }
+
+ @Override
+ public void accept(ByteBuffer buffer) {
+ String nodeId = readAppMessagingInitialMessage(buffer);
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("Opened messaging channel with node: " + nodeId);
+ }
+ messageBroker.registerMessagingChannel(nodeId, ccb);
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public void error(int ecode) {
+
+ }
+
+ private String readAppMessagingInitialMessage(ByteBuffer buffer) {
+ int nodeIdLength = buffer.getInt();
+ byte[] stringBytes = new byte[nodeIdLength];
+ buffer.get(stringBytes);
+ return new String(stringBytes);
+ }
+ }
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
index 4b28aef..2f1a7d1 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
@@ -25,6 +25,7 @@
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
import org.apache.hyracks.api.comm.NetworkAddress;
import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
import org.apache.hyracks.api.exceptions.HyracksException;
@@ -33,7 +34,6 @@
import org.apache.hyracks.comm.channels.IChannelConnectionFactory;
import org.apache.hyracks.comm.channels.NetworkOutputChannel;
import org.apache.hyracks.control.nc.partitions.PartitionManager;
-import org.apache.hyracks.net.buffers.ICloseableBufferAcceptor;
import org.apache.hyracks.net.exceptions.NetException;
import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
import org.apache.hyracks.net.protocols.muxdemux.IChannelOpenListener;
diff --git
a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/scheduler/SchedulerTest.java
b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/scheduler/SchedulerTest.java
index 373fe21..13cb171 100644
---
a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/scheduler/SchedulerTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/scheduler/SchedulerTest.java
@@ -21,24 +21,22 @@
import java.io.FileReader;
import java.io.IOException;
-import java.net.InetAddress;
-import java.util.HashMap;
import java.util.Map;
-
-import junit.framework.Assert;
-import junit.framework.TestCase;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
-import org.xml.sax.InputSource;
-import org.xml.sax.SAXException;
-
import org.apache.hyracks.api.client.NodeControllerInfo;
import org.apache.hyracks.api.client.NodeStatus;
import org.apache.hyracks.api.comm.NetworkAddress;
import org.apache.hyracks.api.topology.ClusterTopology;
import org.apache.hyracks.api.topology.TopologyDefinitionParser;
+import org.apache.hyracks.hdfs.utils.TestUtils;
+import org.xml.sax.InputSource;
+import org.xml.sax.SAXException;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
@SuppressWarnings("deprecation")
public class SchedulerTest extends TestCase {
@@ -60,13 +58,8 @@
* @throws Exception
*/
public void testSchedulerSimple() throws Exception {
- Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String,
NodeControllerInfo>();
- ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1",
NodeStatus.ALIVE, new NetworkAddress("10.0.0.1", 5099), new
NetworkAddress("10.0.0.1", 5098)));
- ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2",
NodeStatus.ALIVE, new NetworkAddress("10.0.0.2", 5099), new
NetworkAddress("10.0.0.2", 5098)));
- ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3",
NodeStatus.ALIVE, new NetworkAddress("10.0.0.3", 5099), new
NetworkAddress("10.0.0.3", 5098)));
- ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4",
NodeStatus.ALIVE, new NetworkAddress("10.0.0.4", 5099), new
NetworkAddress("10.0.0.4", 5098)));
- ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5",
NodeStatus.ALIVE, new NetworkAddress("10.0.0.5", 5099), new
NetworkAddress("10.0.0.5", 5098)));
- ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6",
NodeStatus.ALIVE, new NetworkAddress("10.0.0.6", 5099), new
NetworkAddress("10.0.0.6", 5098)));
+ Map<String, NodeControllerInfo> ncNameToNcInfos =
TestUtils.generateNodeControllerInfo(6, "nc", "10.0.0.", 5099,
+ 5098, 5097);
InputSplit[] fileSplits = new InputSplit[6];
fileSplits[0] = new FileSplit(new Path("part-1"), 0, 0, new String[] {
"10.0.0.1", "10.0.0.2", "10.0.0.3" });
@@ -98,13 +91,17 @@
* @throws Exception
*/
public void testSchedulerLargerHDFS() throws Exception {
- Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String,
NodeControllerInfo>();
- ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1",
NodeStatus.ALIVE, new NetworkAddress("10.0.0.1", 5099), new
NetworkAddress("10.0.0.1", 5098)));
- ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2",
NodeStatus.ALIVE, new NetworkAddress("10.0.0.2", 5099), new
NetworkAddress("10.0.0.2", 5098)));
- ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3",
NodeStatus.ALIVE, new NetworkAddress("10.0.0.3", 5099), new
NetworkAddress("10.0.0.3", 5098)));
- ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4",
NodeStatus.ALIVE, new NetworkAddress("10.0.0.4", 5099), new
NetworkAddress("10.0.0.4", 5098)));
- ncNameToNcInfos.put("nc7", new NodeControllerInfo("nc7",
NodeStatus.ALIVE, new NetworkAddress("10.0.0.7", 5099), new
NetworkAddress("10.0.0.5", 5098)));
- ncNameToNcInfos.put("nc12", new NodeControllerInfo("nc12",
NodeStatus.ALIVE, new NetworkAddress("10.0.0.12", 5099), new
NetworkAddress("10.0.0.5", 5098)));
+ int dataPort = 5099;
+ int resultPort = 5098;
+ int messagingPort = 5097;
+ Map<String, NodeControllerInfo> ncNameToNcInfos =
TestUtils.generateNodeControllerInfo(4, "nc", "10.0.0.",
+ dataPort, resultPort, messagingPort);
+ ncNameToNcInfos.put("nc7",
+ new NodeControllerInfo("nc7", NodeStatus.ALIVE, new
NetworkAddress("10.0.0.7", dataPort),
+ new NetworkAddress("10.0.0.5", resultPort), new
NetworkAddress("10.0.0.5", messagingPort)));
+ ncNameToNcInfos.put("nc12",
+ new NodeControllerInfo("nc12", NodeStatus.ALIVE, new
NetworkAddress("10.0.0.12", dataPort),
+ new NetworkAddress("10.0.0.5", resultPort), new
NetworkAddress("10.0.0.5", messagingPort)));
InputSplit[] fileSplits = new InputSplit[12];
fileSplits[0] = new FileSplit(new Path("part-1"), 0, 0, new String[] {
"10.0.0.1", "10.0.0.2", "10.0.0.3" });
@@ -115,7 +112,8 @@
fileSplits[5] = new FileSplit(new Path("part-6"), 0, 0, new String[] {
"10.0.0.2", "10.0.0.3", "10.0.0.5" });
fileSplits[6] = new FileSplit(new Path("part-7"), 0, 0, new String[] {
"10.0.0.1", "10.0.0.2", "10.0.0.3" });
fileSplits[7] = new FileSplit(new Path("part-8"), 0, 0, new String[] {
"10.0.0.3", "10.0.0.4", "10.0.0.5" });
- fileSplits[8] = new FileSplit(new Path("part-12"), 0, 0, new String[]
{ "10.0.0.14", "10.0.0.11", "10.0.0.13" });
+ fileSplits[8] = new FileSplit(new Path("part-12"), 0, 0,
+ new String[] { "10.0.0.14", "10.0.0.11", "10.0.0.13" });
fileSplits[9] = new FileSplit(new Path("part-10"), 0, 0, new String[]
{ "10.0.0.2", "10.0.0.1", "10.0.0.6" });
fileSplits[10] = new FileSplit(new Path("part-11"), 0, 0, new String[]
{ "10.0.0.3", "10.0.0.4", "10.0.0.7" });
fileSplits[11] = new FileSplit(new Path("part-9"), 0, 0, new String[]
{ "10.0.0.4", "10.0.0.5", "10.0.0.6" });
@@ -123,8 +121,8 @@
Scheduler scheduler = new Scheduler(ncNameToNcInfos);
String[] locationConstraints =
scheduler.getLocationConstraints(fileSplits);
- String[] expectedResults = new String[] { "nc1", "nc4", "nc4", "nc1",
"nc3", "nc2", "nc2", "nc3", "nc12",
- "nc7", "nc7", "nc12" };
+ String[] expectedResults = new String[] { "nc1", "nc4", "nc4", "nc1",
"nc3", "nc2", "nc2", "nc3", "nc12", "nc7",
+ "nc7", "nc12" };
for (int i = 0; i < locationConstraints.length; i++) {
Assert.assertEquals(locationConstraints[i], expectedResults[i]);
}
@@ -145,13 +143,8 @@
* @throws Exception
*/
public void testSchedulerSmallerHDFS() throws Exception {
- Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String,
NodeControllerInfo>();
- ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1",
NodeStatus.ALIVE, new NetworkAddress("10.0.0.1", 5099), new
NetworkAddress("10.0.0.1", 5098)));
- ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2",
NodeStatus.ALIVE, new NetworkAddress("10.0.0.2", 5099), new
NetworkAddress("10.0.0.2", 5098)));
- ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3",
NodeStatus.ALIVE, new NetworkAddress("10.0.0.3", 5099), new
NetworkAddress("10.0.0.3", 5098)));
- ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4",
NodeStatus.ALIVE, new NetworkAddress("10.0.0.4", 5099), new
NetworkAddress("10.0.0.4", 5098)));
- ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5",
NodeStatus.ALIVE, new NetworkAddress("10.0.0.5", 5099), new
NetworkAddress("10.0.0.5", 5098)));
- ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6",
NodeStatus.ALIVE, new NetworkAddress("10.0.0.6", 5099), new
NetworkAddress("10.0.0.6", 5098)));
+ Map<String, NodeControllerInfo> ncNameToNcInfos =
TestUtils.generateNodeControllerInfo(6, "nc", "10.0.0.", 5099,
+ 5098, 5097);
InputSplit[] fileSplits = new InputSplit[12];
fileSplits[0] = new FileSplit(new Path("part-1"), 0, 0, new String[] {
"10.0.0.1", "10.0.0.2", "10.0.0.3" });
@@ -191,13 +184,8 @@
* @throws Exception
*/
public void testSchedulerSmallerHDFSOdd() throws Exception {
- Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String,
NodeControllerInfo>();
- ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1",
NodeStatus.ALIVE, new NetworkAddress("10.0.0.1", 5099), new
NetworkAddress("10.0.0.1", 5098)));
- ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2",
NodeStatus.ALIVE, new NetworkAddress("10.0.0.2", 5099), new
NetworkAddress("10.0.0.2", 5098)));
- ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3",
NodeStatus.ALIVE, new NetworkAddress("10.0.0.3", 5099), new
NetworkAddress("10.0.0.3", 5098)));
- ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4",
NodeStatus.ALIVE, new NetworkAddress("10.0.0.4", 5099), new
NetworkAddress("10.0.0.4", 5098)));
- ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5",
NodeStatus.ALIVE, new NetworkAddress("10.0.0.5", 5099), new
NetworkAddress("10.0.0.5", 5098)));
- ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6",
NodeStatus.ALIVE, new NetworkAddress("10.0.0.6", 5099), new
NetworkAddress("10.0.0.6", 5098)));
+ Map<String, NodeControllerInfo> ncNameToNcInfos =
TestUtils.generateNodeControllerInfo(6, "nc", "10.0.0.", 5099,
+ 5098, 5097);
InputSplit[] fileSplits = new InputSplit[13];
fileSplits[0] = new FileSplit(new Path("part-1"), 0, 0, new String[] {
"10.0.0.1", "10.0.0.2", "10.0.0.3" });
@@ -239,13 +227,8 @@
* @throws Exception
*/
public void testSchedulercBoundary() throws Exception {
- Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String,
NodeControllerInfo>();
- ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1",
NodeStatus.ALIVE, new NetworkAddress("10.0.0.1", 5099), new
NetworkAddress("10.0.0.1", 5098)));
- ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2",
NodeStatus.ALIVE, new NetworkAddress("10.0.0.2", 5099), new
NetworkAddress("10.0.0.2", 5098)));
- ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3",
NodeStatus.ALIVE, new NetworkAddress("10.0.0.3", 5099), new
NetworkAddress("10.0.0.3", 5098)));
- ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4",
NodeStatus.ALIVE, new NetworkAddress("10.0.0.4", 5099), new
NetworkAddress("10.0.0.4", 5098)));
- ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5",
NodeStatus.ALIVE, new NetworkAddress("10.0.0.5", 5099), new
NetworkAddress("10.0.0.5", 5098)));
- ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6",
NodeStatus.ALIVE, new NetworkAddress("10.0.0.6", 5099), new
NetworkAddress("10.0.0.6", 5098)));
+ Map<String, NodeControllerInfo> ncNameToNcInfos =
TestUtils.generateNodeControllerInfo(6, "nc", "10.0.0.", 5099,
+ 5098, 5097);
/** test empty file splits */
InputSplit[] fileSplits = new InputSplit[0];
diff --git
a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/utils/TestUtils.java
b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/utils/TestUtils.java
index c2ad4a0..8755cf3 100644
---
a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/utils/TestUtils.java
+++
b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/utils/TestUtils.java
@@ -22,6 +22,12 @@
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hyracks.api.client.NodeControllerInfo;
+import org.apache.hyracks.api.client.NodeStatus;
+import org.apache.hyracks.api.comm.NetworkAddress;
public class TestUtils {
@@ -38,8 +44,8 @@
throw new Exception("Actual result changed at line " + num
+ ":\n< " + lineExpected + "\n> ");
}
if (!equalStrings(lineExpected, lineActual)) {
- throw new Exception("Result for changed at line " + num +
":\n< " + lineExpected + "\n> "
- + lineActual);
+ throw new Exception(
+ "Result for changed at line " + num + ":\n< " +
lineExpected + "\n> " + lineActual);
}
++num;
}
@@ -94,4 +100,16 @@
return true;
}
+ public static Map<String, NodeControllerInfo>
generateNodeControllerInfo(int numberOfNodes, String ncNamePrefix,
+ String addressPrefix, int netPort, int dataPort, int
messagingPort) {
+ Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String,
NodeControllerInfo>();
+ for (int i = 1; i <= numberOfNodes; i++) {
+ String ncId = ncNamePrefix + i;
+ String ncAddress = addressPrefix + i;
+ ncNameToNcInfos.put(ncId,
+ new NodeControllerInfo(ncId, NodeStatus.ALIVE, new
NetworkAddress(ncAddress, netPort),
+ new NetworkAddress(ncAddress, dataPort), new
NetworkAddress(ncAddress, messagingPort)));
+ }
+ return ncNameToNcInfos;
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs2/scheduler/SchedulerTest.java
b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs2/scheduler/SchedulerTest.java
index 6eabb71..793e029 100644
---
a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs2/scheduler/SchedulerTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs2/scheduler/SchedulerTest.java
@@ -19,22 +19,18 @@
package org.apache.hyracks.hdfs2.scheduler;
-import java.net.InetAddress;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
-
-import junit.framework.Assert;
-import junit.framework.TestCase;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-
import org.apache.hyracks.api.client.NodeControllerInfo;
-import org.apache.hyracks.api.client.NodeStatus;
-import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.hdfs.utils.TestUtils;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
/**
* Test case for the new HDFS API scheduler
@@ -47,13 +43,8 @@
* @throws Exception
*/
public void testSchedulerSimple() throws Exception {
- Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String,
NodeControllerInfo>();
- ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1",
NodeStatus.ALIVE, new NetworkAddress("10.0.0.1", 5099), new
NetworkAddress("10.0.0.1", 5098)));
- ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2",
NodeStatus.ALIVE, new NetworkAddress("10.0.0.2", 5099), new
NetworkAddress("10.0.0.2", 5098)));
- ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3",
NodeStatus.ALIVE, new NetworkAddress("10.0.0.3", 5099), new
NetworkAddress("10.0.0.3", 5098)));
- ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4",
NodeStatus.ALIVE, new NetworkAddress("10.0.0.4", 5099), new
NetworkAddress("10.0.0.4", 5098)));
- ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5",
NodeStatus.ALIVE, new NetworkAddress("10.0.0.5", 5099), new
NetworkAddress("10.0.0.5", 5098)));
- ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6",
NodeStatus.ALIVE, new NetworkAddress("10.0.0.6", 5099), new
NetworkAddress("10.0.0.6", 5098)));
+ Map<String, NodeControllerInfo> ncNameToNcInfos =
TestUtils.generateNodeControllerInfo(6, "nc", "10.0.0.", 5099,
+ 5098, 5097);
List<InputSplit> fileSplits = new ArrayList<InputSplit>();
fileSplits.add(new FileSplit(new Path("part-1"), 0, 0, new String[] {
"10.0.0.1", "10.0.0.2", "10.0.0.3" }));
@@ -79,13 +70,8 @@
* @throws Exception
*/
public void testSchedulerLargerHDFS() throws Exception {
- Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String,
NodeControllerInfo>();
- ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1",
NodeStatus.ALIVE, new NetworkAddress("10.0.0.1", 5099), new
NetworkAddress("10.0.0.1", 5098)));
- ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2",
NodeStatus.ALIVE, new NetworkAddress("10.0.0.2", 5099), new
NetworkAddress("10.0.0.2", 5098)));
- ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3",
NodeStatus.ALIVE, new NetworkAddress("10.0.0.3", 5099), new
NetworkAddress("10.0.0.3", 5098)));
- ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4",
NodeStatus.ALIVE, new NetworkAddress("10.0.0.4", 5099), new
NetworkAddress("10.0.0.4", 5098)));
- ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5",
NodeStatus.ALIVE, new NetworkAddress("10.0.0.5", 5099), new
NetworkAddress("10.0.0.5", 5098)));
- ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6",
NodeStatus.ALIVE, new NetworkAddress("10.0.0.6", 5099), new
NetworkAddress("10.0.0.6", 5098)));
+ Map<String, NodeControllerInfo> ncNameToNcInfos =
TestUtils.generateNodeControllerInfo(6, "nc", "10.0.0.", 5099,
+ 5098, 5097);
List<InputSplit> fileSplits = new ArrayList<InputSplit>();
fileSplits.add(new FileSplit(new Path("part-1"), 0, 0, new String[] {
"10.0.0.1", "10.0.0.2", "10.0.0.3" }));
@@ -118,13 +104,8 @@
* @throws Exception
*/
public void testSchedulerSmallerHDFS() throws Exception {
- Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String,
NodeControllerInfo>();
- ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1",
NodeStatus.ALIVE, new NetworkAddress("10.0.0.1", 5099), new
NetworkAddress("10.0.0.1", 5098)));
- ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2",
NodeStatus.ALIVE, new NetworkAddress("10.0.0.2", 5099), new
NetworkAddress("10.0.0.2", 5098)));
- ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3",
NodeStatus.ALIVE, new NetworkAddress("10.0.0.3", 5099), new
NetworkAddress("10.0.0.3", 5098)));
- ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4",
NodeStatus.ALIVE, new NetworkAddress("10.0.0.4", 5099), new
NetworkAddress("10.0.0.4", 5098)));
- ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5",
NodeStatus.ALIVE, new NetworkAddress("10.0.0.5", 5099), new
NetworkAddress("10.0.0.5", 5098)));
- ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6",
NodeStatus.ALIVE, new NetworkAddress("10.0.0.6", 5099), new
NetworkAddress("10.0.0.6", 5098)));
+ Map<String, NodeControllerInfo> ncNameToNcInfos =
TestUtils.generateNodeControllerInfo(6, "nc", "10.0.0.", 5099,
+ 5098, 5097);
List<InputSplit> fileSplits = new ArrayList<InputSplit>();
fileSplits.add(new FileSplit(new Path("part-1"), 0, 0, new String[] {
"10.0.0.1", "10.0.0.2", "10.0.0.3" }));
@@ -157,13 +138,8 @@
* @throws Exception
*/
public void testSchedulerSmallerHDFSOdd() throws Exception {
- Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String,
NodeControllerInfo>();
- ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1",
NodeStatus.ALIVE, new NetworkAddress("10.0.0.1", 5099), new
NetworkAddress("10.0.0.1", 5098)));
- ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2",
NodeStatus.ALIVE, new NetworkAddress("10.0.0.2", 5099), new
NetworkAddress("10.0.0.2", 5098)));
- ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3",
NodeStatus.ALIVE, new NetworkAddress("10.0.0.3", 5099), new
NetworkAddress("10.0.0.3", 5098)));
- ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4",
NodeStatus.ALIVE, new NetworkAddress("10.0.0.4", 5099), new
NetworkAddress("10.0.0.4", 5098)));
- ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5",
NodeStatus.ALIVE, new NetworkAddress("10.0.0.5", 5099), new
NetworkAddress("10.0.0.5", 5098)));
- ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6",
NodeStatus.ALIVE, new NetworkAddress("10.0.0.6", 5099), new
NetworkAddress("10.0.0.6", 5098)));
+ Map<String, NodeControllerInfo> ncNameToNcInfos =
TestUtils.generateNodeControllerInfo(6, "nc", "10.0.0.", 5099,
+ 5098, 5097);
List<InputSplit> fileSplits = new ArrayList<InputSplit>();
fileSplits.add(new FileSplit(new Path("part-1"), 0, 0, new String[] {
"10.0.0.1", "10.0.0.2", "10.0.0.3" }));
diff --git a/hyracks-fullstack/hyracks/hyracks-net/pom.xml
b/hyracks-fullstack/hyracks/hyracks-net/pom.xml
index 3c354f0..96b87fb 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-net/pom.xml
@@ -42,6 +42,11 @@
<dependencies>
<dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-api</artifactId>
+ <version>0.2.18-SNAPSHOT</version>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
diff --git
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
index 6d82297..100ed05 100644
---
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
+++
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
@@ -28,8 +28,12 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.hyracks.net.buffers.IBufferAcceptor;
-import org.apache.hyracks.net.buffers.ICloseableBufferAcceptor;
+import org.apache.hyracks.api.comm.IBufferAcceptor;
+import org.apache.hyracks.api.comm.IBufferFactory;
+import org.apache.hyracks.api.comm.IChannelControlBlock;
+import org.apache.hyracks.api.comm.IChannelReadInterface;
+import org.apache.hyracks.api.comm.IChannelWriteInterface;
+import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
import org.apache.hyracks.net.exceptions.NetException;
/**
@@ -37,7 +41,7 @@
*
* @author vinayakb
*/
-public class ChannelControlBlock {
+public class ChannelControlBlock implements IChannelControlBlock {
private static final Logger LOGGER =
Logger.getLogger(ChannelControlBlock.class.getName());
private final ChannelSet cSet;
@@ -71,20 +75,12 @@
return channelId;
}
- /**
- * Get the read inderface of this channel.
- *
- * @return the read interface.
- */
+ @Override
public IChannelReadInterface getReadInterface() {
return ri;
}
- /**
- * Get the write interface of this channel.
- *
- * @return the write interface.
- */
+ @Override
public IChannelWriteInterface getWriteInterface() {
return wi;
}
@@ -92,7 +88,7 @@
private final class ReadInterface implements IChannelReadInterface {
private final Deque<ByteBuffer> riEmptyStack;
- private final IBufferAcceptor eba = new IBufferAcceptor() {
+ private IBufferAcceptor eba = new IBufferAcceptor() {
@Override
public void accept(ByteBuffer buffer) {
int delta = buffer.remaining();
@@ -114,6 +110,8 @@
private IBufferFactory bufferFactory;
+ private boolean flushOnCompleteRead = false;
+
ReadInterface() {
riEmptyStack = new ArrayDeque<ByteBuffer>();
credits = 0;
@@ -133,6 +131,16 @@
@Override
public IBufferAcceptor getEmptyBufferAcceptor() {
return eba;
+ }
+
+ @Override
+ public void setEmptyBufferAcceptor(IBufferAcceptor eba) {
+ this.eba = eba;
+ }
+
+ @Override
+ public void setFlushOnCompleteRead(boolean flushOnCompleteRead) {
+ this.flushOnCompleteRead = flushOnCompleteRead;
}
int read(SocketChannel sc, int size) throws IOException, NetException {
@@ -168,7 +176,7 @@
} else {
return size;
}
- if (currentReadBuffer.remaining() <= 0) {
+ if (currentReadBuffer.remaining() <= 0 || flushOnCompleteRead)
{
flush();
}
}
@@ -363,7 +371,8 @@
this.ri.credits = credits;
}
- synchronized void addWriteCredits(int delta) {
+ @Override
+ public synchronized void addWriteCredits(int delta) {
wi.credits += delta;
wi.adjustChannelWritability();
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/NetTest.java
b/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/NetTest.java
index f1f3c4a..bf01fdb 100644
---
a/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/NetTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/NetTest.java
@@ -28,9 +28,8 @@
import junit.framework.Assert;
import org.junit.Test;
-
-import org.apache.hyracks.net.buffers.IBufferAcceptor;
-import org.apache.hyracks.net.buffers.ICloseableBufferAcceptor;
+import org.apache.hyracks.api.comm.IBufferAcceptor;
+import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
import org.apache.hyracks.net.protocols.muxdemux.IChannelOpenListener;
import org.apache.hyracks.net.protocols.muxdemux.MultiplexedConnection;
--
To view, visit https://asterix-gerrit.ics.uci.edu/897
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I5c0bd7c11c1e78954ebceff49cb274d8073a64bd
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <[email protected]>