Murtadha Hubail has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/897

Change subject: Introduce MessagingNetworkManager for NC2NC AppMessaging
......................................................................

Introduce MessagingNetworkManager for NC2NC AppMessaging

This change introduces MessagingNetworkManager to NodeControllerService.
The MessagingNetworkManager is used to open channels that are used for NC2NC
application messaging. The messaging channels are passed to the application
through the IMessageBroker APIs and should be managed by the application itself.

Change-Id: I5c0bd7c11c1e78954ebceff49cb274d8073a64bd
---
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
A 
asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingHelper.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeControllerInfo.java
R 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IBufferAcceptor.java
R 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IBufferFactory.java
C 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelControlBlock.java
R 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelReadInterface.java
R 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelWriteInterface.java
R 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/ICloseableBufferAcceptor.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/messages/IMessageBroker.java
M 
hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/DatasetNetworkInputChannel.java
A 
hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NcMessagingUtil.java
M 
hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java
M 
hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
M 
hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ReadBufferFactory.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/DatasetNetworkManager.java
A 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/MessagingNetworkManager.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
M 
hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/scheduler/SchedulerTest.java
M 
hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/utils/TestUtils.java
M 
hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs2/scheduler/SchedulerTest.java
M hyracks-fullstack/hyracks/hyracks-net/pom.xml
M 
hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
M 
hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/NetTest.java
33 files changed, 848 insertions(+), 200 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/97/897/1

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index cc50b75..0cc9570 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -82,6 +82,7 @@
             ncConfig1.clusterNetIPAddress = "127.0.0.1";
             ncConfig1.dataIPAddress = "127.0.0.1";
             ncConfig1.resultIPAddress = "127.0.0.1";
+            ncConfig1.messagingIPAddress = "127.0.0.1";
             ncConfig1.nodeId = ncName;
             ncConfig1.resultTTL = 30000;
             ncConfig1.resultSweepThreshold = 1000;
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
index 13b0189..836946a 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
@@ -18,8 +18,10 @@
  */
 package org.apache.asterix.messaging;
 
+import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -44,40 +46,66 @@
 import org.apache.asterix.event.schema.cluster.Node;
 import org.apache.asterix.metadata.bootstrap.MetadataIndexImmutableProperties;
 import 
org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.hyracks.api.comm.IChannelControlBlock;
 import org.apache.hyracks.api.messages.IMessage;
 import org.apache.hyracks.api.util.JavaSerializationUtils;
 import org.apache.hyracks.control.nc.NodeControllerService;
 
 public class NCMessageBroker implements INCMessageBroker {
-    private final static Logger LOGGER = 
Logger.getLogger(NCMessageBroker.class.getName());
+    private static final Logger LOGGER = 
Logger.getLogger(NCMessageBroker.class.getName());
 
     private final NodeControllerService ncs;
     private final AtomicLong messageId = new AtomicLong(0);
     private final Map<Long, IApplicationMessageCallback> callbacks;
     private final IAsterixAppRuntimeContext appContext;
+    private final Map<String, IChannelControlBlock> ncChannels = new 
HashMap<>();
+    private final NCMessagingHelper messagingHelper;
+    private final LinkedBlockingQueue<IApplicationMessage> receivedMsgsQ;
+    private final MessageDeliveryService msgDeliverySvc;
 
     public NCMessageBroker(NodeControllerService ncs) {
         this.ncs = ncs;
         appContext = (IAsterixAppRuntimeContext) 
ncs.getApplicationContext().getApplicationObject();
-        callbacks = new ConcurrentHashMap<Long, IApplicationMessageCallback>();
+        callbacks = new ConcurrentHashMap<>();
+        messagingHelper = new NCMessagingHelper(ncs, this);
+        receivedMsgsQ = new LinkedBlockingQueue<>();
+        msgDeliverySvc = new MessageDeliveryService();
+        appContext.getThreadExecutor().execute(msgDeliverySvc);
     }
 
     @Override
-    public void sendMessage(IApplicationMessage message, 
IApplicationMessageCallback callback) throws Exception {
-        if (callback != null) {
-            long uniqueMessageId = messageId.incrementAndGet();
-            message.setId(uniqueMessageId);
-            callbacks.put(uniqueMessageId, callback);
-        }
+    public void sendMessageToCC(IApplicationMessage message, 
IApplicationMessageCallback callback) throws Exception {
+        registerMsgCallback(message, callback);
         try {
             
ncs.sendApplicationMessageToCC(JavaSerializationUtils.serialize(message), null);
         } catch (Exception e) {
-            if (callback != null) {
-                //remove the callback in case of failure
-                callbacks.remove(message.getId());
-            }
+            handleMsgDeliveryFailure(message);
             throw e;
         }
+    }
+
+    @Override
+    public void sendMessageToNC(String nodeId, IApplicationMessage message, 
IApplicationMessageCallback callback)
+            throws Exception {
+        registerMsgCallback(message, callback);
+        try {
+            IChannelControlBlock ccb = getNCChannel(nodeId);
+            messagingHelper.sendMessageToChannel(ccb, message);
+        } catch (Exception e) {
+            handleMsgDeliveryFailure(message);
+            throw e;
+        }
+    }
+
+    @Override
+    public void registerMessagingChannel(String nodeId, IChannelControlBlock 
ccb) {
+        messagingHelper.configureChannel(ccb);
+        addOpenChannel(nodeId, ccb);
+    }
+
+    @Override
+    public void queueReceivedMessage(IApplicationMessage msg) {
+        receivedMsgsQ.offer(msg);
     }
 
     @Override
@@ -117,8 +145,54 @@
                     break;
             }
         } catch (Exception e) {
-            e.printStackTrace();
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.log(Level.WARNING, e.getMessage(), e);
+            }
             throw e;
+        }
+    }
+
+    private void registerMsgCallback(IApplicationMessage message, 
IApplicationMessageCallback callback) {
+        if (callback != null) {
+            long uniqueMessageId = messageId.incrementAndGet();
+            message.setId(uniqueMessageId);
+            callbacks.put(uniqueMessageId, callback);
+        }
+    }
+
+    private void handleMsgDeliveryFailure(IApplicationMessage message) {
+        callbacks.remove(message.getId());
+    }
+
+    private IChannelControlBlock getNCChannel(String nodeId) throws Exception {
+        synchronized (ncChannels) {
+            // check if there is an existing open channel first
+            IChannelControlBlock ncChannel = ncChannels.get(nodeId);
+            if (ncChannels.get(nodeId) == null) {
+                //open a new channel and register it
+                ncChannel = messagingHelper.openChannel(nodeId);
+                registerMessagingChannel(nodeId, ncChannel);
+            }
+            return ncChannel;
+        }
+    }
+
+    private void addOpenChannel(String nodeId, IChannelControlBlock ccb) {
+        synchronized (ncChannels) {
+            if (ncChannels.get(nodeId) == null) {
+                ncChannels.put(nodeId, ccb);
+            } else {
+                //close this channel since another channel was opened with 
this node
+                messagingHelper.closeChannel(ccb);
+                /**
+                 * TODO Currently there is a chance that two nodes will open
+                 * a channel to each other at the exact same time and both will
+                 * end up using a half closed channel. While this isn't a big 
issue,
+                 * it should be eliminated by introducing negotiation protocol
+                 * between nodes to decide which channel to use and which 
channel
+                 * to close fully.
+                 */
+            }
         }
     }
 
@@ -133,7 +207,7 @@
                 //send response after takeover is completed
                 TakeoverPartitionsResponseMessage reponse = new 
TakeoverPartitionsResponseMessage(msg.getRequestId(),
                         appContext.getTransactionSubsystem().getId(), 
msg.getPartitions());
-                sendMessage(reponse, null);
+                sendMessageToCC(reponse, null);
             }
         }
     }
@@ -145,7 +219,7 @@
         } finally {
             TakeoverMetadataNodeResponseMessage reponse = new 
TakeoverMetadataNodeResponseMessage(
                     appContext.getTransactionSubsystem().getId());
-            sendMessage(reponse, null);
+            sendMessageToCC(reponse, null);
         }
     }
 
@@ -156,7 +230,7 @@
         long maxResourceId = 
Math.max(appContext.getLocalResourceRepository().getMaxResourceID(),
                 
MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID);
         maxResourceIdMsg.setMaxResourceId(maxResourceId);
-        sendMessage(maxResourceIdMsg, null);
+        sendMessageToCC(maxResourceIdMsg, null);
     }
 
     private void handleReplicaEvent(IMessage message) {
@@ -198,7 +272,7 @@
         //send response after partitions prepared for failback
         PreparePartitionsFailbackResponseMessage reponse = new 
PreparePartitionsFailbackResponseMessage(msg.getPlanId(),
                 msg.getRequestId(), msg.getPartitions());
-        sendMessage(reponse, null);
+        sendMessageToCC(reponse, null);
     }
 
     private void handleCompleteFailbackRequest(IMessage message) throws 
Exception {
@@ -209,7 +283,26 @@
         } finally {
             CompleteFailbackResponseMessage reponse = new 
CompleteFailbackResponseMessage(msg.getPlanId(),
                     msg.getRequestId(), msg.getPartitions());
-            sendMessage(reponse, null);
+            sendMessageToCC(reponse, null);
+        }
+    }
+
+    private class MessageDeliveryService implements Runnable {
+        @Override
+        public void run() {
+            while (true) {
+                try {
+                    IApplicationMessage msg = receivedMsgsQ.take();
+                    //TODO add nodeId to IApplicationMessage and pass it
+                    receivedMessage(msg, null);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                } catch (Exception e) {
+                    if (LOGGER.isLoggable(Level.WARNING)) {
+                        LOGGER.log(Level.WARNING, "Could not process message", 
e);
+                    }
+                }
+            }
         }
     }
 }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingHelper.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingHelper.java
new file mode 100644
index 0000000..a8908aa
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingHelper.java
@@ -0,0 +1,217 @@
+package org.apache.asterix.messaging;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.external.feed.dataflow.FrameAction;
+import org.apache.asterix.external.feed.management.ConcurrentFramePool;
+import org.apache.hyracks.api.client.NodeControllerInfo;
+import org.apache.hyracks.api.comm.IBufferAcceptor;
+import org.apache.hyracks.api.comm.IBufferFactory;
+import org.apache.hyracks.api.comm.IChannelControlBlock;
+import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
+import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.JavaSerializationUtils;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
+import org.apache.hyracks.util.StorageUtil;
+import org.apache.hyracks.util.StorageUtil.StorageUnit;
+
+public class NCMessagingHelper {
+
+    private static final Logger LOGGER = 
Logger.getLogger(NCMessagingHelper.class.getName());
+    //TODO make these values configurable and account for their memory usage
+    private static final int MESSAGING_BUFFER_SIZE = 
StorageUtil.getSizeInBytes(4, StorageUnit.KILOBYTE);
+    private static final int MESSAGING_MEMORY_BUDGET = 512 * 
MESSAGING_BUFFER_SIZE;
+    private static final int MAX_MESSAGING_BUFFER_PER_CHANNEL = 16;
+
+    /**
+     * A buffer type to indicate whether an empty buffer was used during
+     * a write or read operation.
+     */
+    private enum AppMessagingEmptyBufferType {
+        READ_BUFFER,
+        WRITE_BUFFER
+    }
+
+    /* A single buffer factory used by all messaging channels */
+    private final IBufferFactory appMessagingBufferFactor = new 
AppMessagingBufferFactory();
+
+    private final INCMessageBroker messageBroker;
+    private final ConcurrentFramePool framePool;
+    private final NodeControllerService ncs;
+
+    public NCMessagingHelper(NodeControllerService ncs, INCMessageBroker 
messageBroker) {
+        this.ncs = ncs;
+        this.messageBroker = messageBroker;
+        framePool = new ConcurrentFramePool(ncs.getId(), 
MESSAGING_MEMORY_BUDGET, MESSAGING_BUFFER_SIZE);
+    }
+
+    public IChannelControlBlock openChannel(String nodeId) throws Exception {
+        // Get active nodes info from CC
+        //TODO Don't send a request to CC for node info each time but only on 
node membership changes.
+        Map<String, NodeControllerInfo> nodeControllers = 
ncs.getNodeControllersInfo();
+
+        // Get the node messaging address from its info
+        NodeControllerInfo nodeControllerInfo = nodeControllers.get(nodeId);
+        if (nodeControllerInfo == null) {
+            throw new HyracksDataException("Could not find node: " + nodeId);
+        }
+        NetworkAddress nodeMessagingNeAddress = 
nodeControllerInfo.getMessagingNetworkAddress();
+        SocketAddress nodeAddress = new 
InetSocketAddress(InetAddress.getByName(nodeMessagingNeAddress.getAddress()),
+                nodeMessagingNeAddress.getPort());
+
+        // Open a channel using MessagingNetManager 
+        IChannelControlBlock ccb = 
ncs.getMessagingNetManager().connect(nodeAddress);
+        //Send the initial app messaging handshake message to register the 
opened channel on both nodes
+        ByteBuffer initialBuffer = 
ncs.getMessagingNetManager().constructAppMessagingInitialMessage(ncs.getId());
+        ccb.getWriteInterface().getFullBufferAcceptor().accept(initialBuffer);
+        return ccb;
+    }
+
+    public void configureChannel(IChannelControlBlock ccb) {
+        /**
+         * Set the app messaging buffer factory and acceptors
+         * to the channel read and write interfaces
+         */
+        ccb.getWriteInterface().setEmptyBufferAcceptor(
+                new AppMessagingEmptyBufferAcceptor(ccb, 
AppMessagingEmptyBufferType.WRITE_BUFFER));
+        ccb.getReadInterface().setBufferFactory(appMessagingBufferFactor, 
MAX_MESSAGING_BUFFER_PER_CHANNEL,
+                MESSAGING_BUFFER_SIZE);
+        AppMessagingEmptyBufferAcceptor readEmptyBufferAcceptor = new 
AppMessagingEmptyBufferAcceptor(ccb,
+                AppMessagingEmptyBufferType.READ_BUFFER);
+        ccb.getReadInterface().setEmptyBufferAcceptor(readEmptyBufferAcceptor);
+        ccb.getReadInterface().setFullBufferAcceptor(new 
AppMessagingReadFullBufferAcceptor(readEmptyBufferAcceptor));
+    }
+
+    public void sendMessageToChannel(IChannelControlBlock ccb, 
IApplicationMessage msg) throws IOException {
+        byte[] serializedMsg = JavaSerializationUtils.serialize(msg);
+        if (serializedMsg.length > MESSAGING_BUFFER_SIZE) {
+            throw new HyracksDataException("Message exceded maximum size");
+        }
+        // Prepare the message buffer
+        ByteBuffer msgBuffer = framePool.get();
+        if (msgBuffer == null) {
+            throw new HyracksDataException("Could not get an empty buffer");
+        }
+        msgBuffer.clear();
+        msgBuffer.put(serializedMsg);
+        msgBuffer.flip();
+        // Give the buffer to the channel write interface for writing
+        ccb.getWriteInterface().getFullBufferAcceptor().accept(msgBuffer);
+    }
+
+    public void closeChannel(IChannelControlBlock ccb) {
+        /**
+         * Close the channel write interface which will result in
+         * sending a CLOSE_CHANNEL request to the remote node of
+         * the channel.
+         */
+        ccb.getWriteInterface().getFullBufferAcceptor().close();
+    }
+
+    /**
+     * A buffer acceptor that receives the read buffers containing messages 
from
+     * other nodes.
+     */
+    private class AppMessagingReadFullBufferAcceptor implements 
ICloseableBufferAcceptor {
+        private final IBufferAcceptor recycle;
+
+        private AppMessagingReadFullBufferAcceptor(IBufferAcceptor recycle) {
+            this.recycle = recycle;
+        }
+
+        @Override
+        public void accept(ByteBuffer buffer) {
+            try {
+                IApplicationMessage receivedMsg = (IApplicationMessage) 
JavaSerializationUtils
+                        .deserialize(buffer.array());
+                //queue the received message and free the network IO thread
+                messageBroker.queueReceivedMessage(receivedMsg);
+            } catch (ClassNotFoundException | IOException e) {
+                if (LOGGER.isLoggable(Level.WARNING)) {
+                    LOGGER.log(Level.WARNING, e.getMessage(), e);
+                }
+            } finally {
+                recycle.accept(buffer);
+            }
+        }
+
+        @Override
+        public void close() {
+        }
+
+        @Override
+        public void error(int ecode) {
+        }
+    }
+
+    /**
+     * Empty buffer acceptor used to return the used buffers in app messaging
+     * to the buffer pool.
+     */
+    private class AppMessagingEmptyBufferAcceptor implements IBufferAcceptor {
+        private final AppMessagingEmptyBufferType bufferType;
+        private final ChannelControlBlock ccb;
+
+        public AppMessagingEmptyBufferAcceptor(IChannelControlBlock ccb, 
AppMessagingEmptyBufferType bufferType) {
+            this.ccb = (ChannelControlBlock) ccb;
+            this.bufferType = bufferType;
+        }
+
+        @Override
+        public void accept(ByteBuffer buffer) {
+            try {
+                int remaining = buffer.remaining();
+                if (bufferType == AppMessagingEmptyBufferType.READ_BUFFER) {
+                    /**
+                     * add the number of received bytes from this channel
+                     * as write credits to make sure the messaging channel
+                     * full write credit is always available.
+                     */
+                    ccb.addWriteCredits(remaining);
+                }
+                framePool.release(buffer);
+            } catch (HyracksDataException e) {
+                if (LOGGER.isLoggable(Level.WARNING)) {
+                    LOGGER.log(Level.WARNING, e.getMessage(), e);
+                }
+            }
+        }
+    }
+
+    /**
+     * A buffer factory based on {@link ConcurrentFramePool}. Used
+     * for messaging channels buffers.
+     */
+    private final class AppMessagingBufferFactory implements IBufferFactory {
+        private final FrameAction frameAction = new FrameAction();
+
+        @Override
+        public ByteBuffer createBuffer() {
+            ByteBuffer buffer = framePool.get();
+            if (buffer == null) {
+                try {
+                    framePool.subscribe(frameAction);
+                    buffer = frameAction.retrieve();
+                } catch (HyracksDataException e) {
+                    if (LOGGER.isLoggable(Level.WARNING)) {
+                        LOGGER.log(Level.WARNING, e.getMessage(), e);
+                    }
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
+            }
+            return buffer;
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
index fba74e8..bcf7a8c 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
@@ -43,7 +43,7 @@
 
     /**
      * Sets a unique message id that identifies this message within an NC.
-     * This id is set by {@link 
INCMessageBroker#sendMessage(IApplicationMessage, IApplicationMessageCallback)}
+     * This id is set by {@link 
INCMessageBroker#sendMessageToCC(IApplicationMessage, 
IApplicationMessageCallback)}
      * when the callback is not null to notify the sender when the response to 
that message is received.
      *
      * @param messageId
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
index 41f8a0c..7628167 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
@@ -29,7 +29,17 @@
      * @param callback
      * @throws Exception
      */
-    public void sendMessage(IApplicationMessage message, 
IApplicationMessageCallback callback) throws Exception;
+    public void sendMessageToCC(IApplicationMessage message, 
IApplicationMessageCallback callback) throws Exception;
+
+    /**
+     * Sends application message from this NC to another NC.
+     *
+     * @param message
+     * @param callback
+     * @throws Exception
+     */
+    public void sendMessageToNC(String nodeId, IApplicationMessage message, 
IApplicationMessageCallback callback)
+            throws Exception;
 
     /**
      * Sends the maximum resource id on this NC to the CC.
@@ -37,4 +47,11 @@
      * @throws Exception
      */
     public void reportMaxResourceId() throws Exception;
+
+    /**
+     * Queue a message to this {@link INCMessageBroker} for processing
+     *
+     * @param msg
+     */
+    public void queueReceivedMessage(IApplicationMessage msg);
 }
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java
index 5b29530..ab1ebe1 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java
@@ -57,7 +57,7 @@
             //if no response available or it has an exception, request a new 
one
             if (reponse == null || reponse.getException() != null) {
                 ResourceIdRequestMessage msg = new ResourceIdRequestMessage();
-                ((INCMessageBroker) 
appCtx.getMessageBroker()).sendMessage(msg, this);
+                ((INCMessageBroker) 
appCtx.getMessageBroker()).sendMessageToCC(msg, this);
                 reponse = (ResourceIdRequestResponseMessage) 
resourceIdResponseQ.take();
                 if (reponse.getException() != null) {
                     throw new 
HyracksDataException(reponse.getException().getMessage());
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeControllerInfo.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeControllerInfo.java
index c41dafe..a79b955 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeControllerInfo.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeControllerInfo.java
@@ -33,12 +33,15 @@
 
     private final NetworkAddress datasetNetworkAddress;
 
+    private final NetworkAddress messagingNetworkAddress;
+
     public NodeControllerInfo(String nodeId, NodeStatus status, NetworkAddress 
netAddress,
-            NetworkAddress datasetNetworkAddress) {
+            NetworkAddress datasetNetworkAddress, NetworkAddress 
messagingNetworkAddress) {
         this.nodeId = nodeId;
         this.status = status;
         this.netAddress = netAddress;
         this.datasetNetworkAddress = datasetNetworkAddress;
+        this.messagingNetworkAddress = messagingNetworkAddress;
     }
 
     public String getNodeId() {
@@ -56,4 +59,8 @@
     public NetworkAddress getDatasetNetworkAddress() {
         return datasetNetworkAddress;
     }
+
+    public NetworkAddress getMessagingNetworkAddress() {
+        return messagingNetworkAddress;
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/buffers/IBufferAcceptor.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IBufferAcceptor.java
similarity index 95%
rename from 
hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/buffers/IBufferAcceptor.java
rename to 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IBufferAcceptor.java
index d7febd2..d69ddd7 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/buffers/IBufferAcceptor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IBufferAcceptor.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.net.buffers;
+package org.apache.hyracks.api.comm;
 
 import java.nio.ByteBuffer;
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/IBufferFactory.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IBufferFactory.java
similarity index 94%
rename from 
hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/IBufferFactory.java
rename to 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IBufferFactory.java
index 53299d8..5188a43 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/IBufferFactory.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IBufferFactory.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.net.protocols.muxdemux;
+package org.apache.hyracks.api.comm;
 
 import java.nio.ByteBuffer;
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/buffers/ICloseableBufferAcceptor.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelControlBlock.java
similarity index 62%
copy from 
hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/buffers/ICloseableBufferAcceptor.java
copy to 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelControlBlock.java
index 1697cfd..4b327b3 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/buffers/ICloseableBufferAcceptor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelControlBlock.java
@@ -16,25 +16,30 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.net.buffers;
+package org.apache.hyracks.api.comm;
 
-/**
- * A buffer acceptor that can be closed to indicate end of transmission or an 
error code
- * specified to indicate an error in transmission.
- *
- * @author vinayakb
- */
-public interface ICloseableBufferAcceptor extends IBufferAcceptor {
-    /**
-     * Close the buffer acceptor.
-     */
-    public void close();
+public interface IChannelControlBlock {
 
     /**
-     * Indicate that an error occurred.
+     * Get the read interface of this channel.
      *
-     * @param ecode
-     *            - the error code.
+     * @return the read interface.
      */
-    public void error(int ecode);
+    IChannelReadInterface getReadInterface();
+
+    /**
+     * Get the write interface of this channel.
+     *
+     * @return the write interface.
+     */
+    IChannelWriteInterface getWriteInterface();
+
+    /**
+     * Add write credit to this channel.
+     *
+     * @param delta
+     *            number of bytes
+     */
+    public void addWriteCredits(int delta);
+
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/IChannelReadInterface.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelReadInterface.java
similarity index 77%
rename from 
hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/IChannelReadInterface.java
rename to 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelReadInterface.java
index 8639fb7..86189f1 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/IChannelReadInterface.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelReadInterface.java
@@ -16,13 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.net.protocols.muxdemux;
-
-import org.apache.hyracks.net.buffers.IBufferAcceptor;
-import org.apache.hyracks.net.buffers.ICloseableBufferAcceptor;
+package org.apache.hyracks.api.comm;
 
 /**
- * Represents the read interface of a {@link ChannelControlBlock}.
+ * Represents the read interface of a {@link IChannelControlBlock}.
  *
  * @author vinayakb
  */
@@ -56,4 +53,19 @@
      *            - the size of each buffer
      */
     public void setBufferFactory(IBufferFactory bufferFactory, int limit, int 
frameSize);
+
+    /**
+     * Set a flag to indicate whether to flush the current read buffer
+     * on complete reads or wait until the buffer is completely filled
+     *
+     * @param flushOnCompleteRead
+     */
+    public void setFlushOnCompleteRead(boolean flushOnCompleteRead);
+
+    /**
+     * Overrides the default empty buffer acceptor of this {@link 
IChannelReadInterface}
+     *
+     * @param eba
+     */
+    public void setEmptyBufferAcceptor(IBufferAcceptor eba);
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/IChannelWriteInterface.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelWriteInterface.java
similarity index 91%
rename from 
hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/IChannelWriteInterface.java
rename to 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelWriteInterface.java
index dc38ea4..6c4ef0e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/IChannelWriteInterface.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelWriteInterface.java
@@ -16,10 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.net.protocols.muxdemux;
-
-import org.apache.hyracks.net.buffers.IBufferAcceptor;
-import org.apache.hyracks.net.buffers.ICloseableBufferAcceptor;
+package org.apache.hyracks.api.comm;
 
 /**
  * Represents the write interface of a {@link ChannelControlBlock}.
diff --git 
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/buffers/ICloseableBufferAcceptor.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/ICloseableBufferAcceptor.java
similarity index 96%
rename from 
hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/buffers/ICloseableBufferAcceptor.java
rename to 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/ICloseableBufferAcceptor.java
index 1697cfd..fba0eaf 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/buffers/ICloseableBufferAcceptor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/ICloseableBufferAcceptor.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.net.buffers;
+package org.apache.hyracks.api.comm;
 
 /**
  * A buffer acceptor that can be closed to indicate end of transmission or an 
error code
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/messages/IMessageBroker.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/messages/IMessageBroker.java
index 2b166d2..7ba9581 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/messages/IMessageBroker.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/messages/IMessageBroker.java
@@ -18,11 +18,13 @@
  */
 package org.apache.hyracks.api.messages;
 
-/**
- * @author rico
- */
+import org.apache.hyracks.api.comm.IChannelControlBlock;
+
 public interface IMessageBroker {
 
     public void receivedMessage(IMessage message, String nodeId) throws 
Exception;
 
+    public default void registerMessagingChannel(String nodeId, 
IChannelControlBlock ccb) {
+        throw new UnsupportedOperationException();
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/DatasetNetworkInputChannel.java
 
b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/DatasetNetworkInputChannel.java
index fba7cf5..8f72bb6 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/DatasetNetworkInputChannel.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/DatasetNetworkInputChannel.java
@@ -27,12 +27,12 @@
 
 import org.apache.hyracks.api.channels.IInputChannel;
 import org.apache.hyracks.api.channels.IInputChannelMonitor;
+import org.apache.hyracks.api.comm.IBufferAcceptor;
+import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
 import org.apache.hyracks.api.context.IHyracksCommonContext;
 import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.net.buffers.IBufferAcceptor;
-import org.apache.hyracks.net.buffers.ICloseableBufferAcceptor;
 import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
 
 public class DatasetNetworkInputChannel implements IInputChannel {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NcMessagingUtil.java
 
b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NcMessagingUtil.java
new file mode 100644
index 0000000..17ae249
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NcMessagingUtil.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.comm.channels;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.partitions.PartitionId;
+import org.apache.hyracks.util.StorageUtil;
+import org.apache.hyracks.util.StorageUtil.StorageUnit;
+
+public class NcMessagingUtil {
+
+    public static final int MAX_INITIAL_MESSAGE_SIZE = 
StorageUtil.getSizeInBytes(2, StorageUnit.KILOBYTE);
+
+    private NcMessagingUtil() {
+        throw new AssertionError("This util class should not be initialized");
+    }
+
+    public enum ChannelType {
+        DATA_PARTITION((byte) 0, 21),
+        APP_MESSAGING((byte) 1, 5);
+
+        private static final Map<Byte, ChannelType> TYPES = 
Collections.unmodifiableMap(initializeMapping());
+        private final int initialMessageBaseSize;
+        private final byte type;
+
+        ChannelType(byte type, int initialMessageBaseSize) {
+            this.type = type;
+            this.initialMessageBaseSize = initialMessageBaseSize;
+        }
+
+        public byte getType() {
+            return type;
+        }
+
+        public int getInitialMessageBaseSize() {
+            return initialMessageBaseSize;
+        }
+
+        public static ChannelType getChannelType(byte type) {
+            return TYPES.get(type);
+        }
+
+        private static Map<Byte, ChannelType> initializeMapping() {
+            Map<Byte, ChannelType> mapping = new HashMap<>();
+            for (ChannelType s : ChannelType.values()) {
+                mapping.put(s.getType(), s);
+            }
+            return mapping;
+        }
+    }
+
+    public static ByteBuffer getDataPartitionChannelInitialBuffer(PartitionId 
partitionId) {
+        ByteBuffer initialBuffer = 
ByteBuffer.allocate(ChannelType.DATA_PARTITION.getInitialMessageBaseSize());
+        initialBuffer.put(ChannelType.DATA_PARTITION.getType());
+        initialBuffer.putLong(partitionId.getJobId().getId());
+        initialBuffer.putInt(partitionId.getConnectorDescriptorId().getId());
+        initialBuffer.putInt(partitionId.getSenderIndex());
+        initialBuffer.putInt(partitionId.getReceiverIndex());
+        initialBuffer.flip();
+        return initialBuffer;
+    }
+
+    public static ByteBuffer getMessagingChannelInitialBuffer(String nodeId) {
+        ByteBuffer initialBuffer = ByteBuffer
+                
.allocate(ChannelType.APP_MESSAGING.getInitialMessageBaseSize() + 
nodeId.length());
+        initialBuffer.put(ChannelType.APP_MESSAGING.getType());
+        initialBuffer.putInt(nodeId.length());
+        initialBuffer.put(nodeId.getBytes());
+        initialBuffer.flip();
+        return initialBuffer;
+    }
+
+    public static PartitionId readDataPartitionInitialMessage(ByteBuffer 
buffer) {
+        JobId jobId = new JobId(buffer.getLong());
+        ConnectorDescriptorId cdid = new 
ConnectorDescriptorId(buffer.getInt());
+        int senderIndex = buffer.getInt();
+        int receiverIndex = buffer.getInt();
+        return new PartitionId(jobId, cdid, senderIndex, receiverIndex);
+    }
+
+    public static String readAppMessagingInitialMessage(ByteBuffer buffer) {
+        int nodeIdLength = buffer.getInt();
+        byte[] stringBytes = new byte[nodeIdLength];
+        buffer.get(stringBytes);
+        return new String(stringBytes);
+    }
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java
 
b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java
index 235536f..7938fc1 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java
@@ -27,11 +27,11 @@
 
 import org.apache.hyracks.api.channels.IInputChannel;
 import org.apache.hyracks.api.channels.IInputChannelMonitor;
+import org.apache.hyracks.api.comm.IBufferAcceptor;
+import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
 import org.apache.hyracks.api.context.IHyracksCommonContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.partitions.PartitionId;
-import org.apache.hyracks.net.buffers.IBufferAcceptor;
-import org.apache.hyracks.net.buffers.ICloseableBufferAcceptor;
 import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
 
 public class NetworkInputChannel implements IInputChannel {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
 
b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
index 46220de..60e2e35 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
@@ -22,9 +22,9 @@
 import java.util.ArrayDeque;
 import java.util.Deque;
 
+import org.apache.hyracks.api.comm.IBufferAcceptor;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.net.buffers.IBufferAcceptor;
 import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
 
 public class NetworkOutputChannel implements IFrameWriter {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ReadBufferFactory.java
 
b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ReadBufferFactory.java
index 3aa77b9..7628cd2 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ReadBufferFactory.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ReadBufferFactory.java
@@ -20,8 +20,8 @@
 
 import java.nio.ByteBuffer;
 
+import org.apache.hyracks.api.comm.IBufferFactory;
 import org.apache.hyracks.api.context.IHyracksCommonContext;
-import org.apache.hyracks.net.protocols.muxdemux.IBufferFactory;
 
 /**
  * @author yingyib
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
index d93ba0d..ff5832a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
@@ -23,10 +23,6 @@
 import java.util.Map;
 import java.util.Set;
 
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.control.common.base.INodeController;
@@ -35,6 +31,9 @@
 import org.apache.hyracks.control.common.heartbeat.HeartbeatData;
 import org.apache.hyracks.control.common.heartbeat.HeartbeatSchema;
 import 
org.apache.hyracks.control.common.heartbeat.HeartbeatSchema.GarbageCollectorInfo;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
 
 public class NodeControllerState {
     private static final int RRD_SIZE = 720;
@@ -46,6 +45,8 @@
     private final NetworkAddress dataPort;
 
     private final NetworkAddress datasetPort;
+
+    private final NetworkAddress messagingPort;
 
     private final Set<JobId> activeJobIds;
 
@@ -142,6 +143,7 @@
         ncConfig = reg.getNCConfig();
         dataPort = reg.getDataPort();
         datasetPort = reg.getDatasetPort();
+        messagingPort = reg.getMessagingPort();
         activeJobIds = new HashSet<JobId>();
 
         osName = reg.getOSName();
@@ -264,6 +266,10 @@
         return datasetPort;
     }
 
+    public NetworkAddress getMessagingPort() {
+        return messagingPort;
+    }
+
     public JSONObject toSummaryJSON() throws JSONException {
         JSONObject o = new JSONObject();
         o.put("node-id", ncConfig.nodeId);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java
index 785a202..726bf12 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java
@@ -40,11 +40,11 @@
 
     @Override
     public void run() {
-        Map<String, NodeControllerInfo> result = new LinkedHashMap<String, 
NodeControllerInfo>();
+        Map<String, NodeControllerInfo> result = new LinkedHashMap<>();
         Map<String, NodeControllerState> nodeMap = ccs.getNodeMap();
         for (Map.Entry<String, NodeControllerState> e : nodeMap.entrySet()) {
-            result.put(e.getKey(), new NodeControllerInfo(e.getKey(), 
NodeStatus.ALIVE, e.getValue().getDataPort(), e
-                    .getValue().getDatasetPort()));
+            result.put(e.getKey(), new NodeControllerInfo(e.getKey(), 
NodeStatus.ALIVE, e.getValue().getDataPort(),
+                    e.getValue().getDatasetPort(), 
e.getValue().getMessagingPort()));
         }
         callback.setValue(result);
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
index b408083..6eb5264 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
@@ -18,18 +18,18 @@
  */
 package org.apache.hyracks.control.common.controllers;
 
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.hyracks.api.application.IApplicationConfig;
 import org.apache.hyracks.control.common.application.IniApplicationConfig;
 import org.ini4j.Ini;
 import org.kohsuke.args4j.Argument;
 import org.kohsuke.args4j.Option;
 import org.kohsuke.args4j.spi.StopOptionHandler;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.net.InetAddress;
-import java.util.List;
-import java.util.Map;
 
 public class NCConfig implements Serializable {
     private static final long serialVersionUID = 2L;
@@ -82,7 +82,7 @@
     @Option(name = "-result-public-port", usage = "Public IP port to announce 
dataset result distribution listener (default: same as -result-port; must set 
-result-public-ip-address also)", required = false)
     public int resultPublicPort = 0;
 
-    @Option(name = "-retries", usage ="Number of attempts to contact CC before 
giving up (default = 5)")
+    @Option(name = "-retries", usage = "Number of attempts to contact CC 
before giving up (default = 5)")
     public int retries = 5;
 
     @Option(name = "-iodevices", usage = "Comma separated list of IO Device 
mount points (default: One device in default temp folder)", required = false)
@@ -112,6 +112,19 @@
     @Option(name = "-config-file", usage = "Specify path to local 
configuration file (default: no local config)", required = false)
     public String configFile = null;
 
+    //TODO add messaging values to NC start scripts
+    @Option(name = "-messaging-ip-address", usage = "IP Address to bind 
messaging listener (default: same as -address)", required = false)
+    public String messagingIPAddress;
+
+    @Option(name = "-messaging-port", usage = "IP port to bind messaging 
listener (default: random port)", required = false)
+    public int messagingPort = 0;
+
+    @Option(name = "-messaging-public-ip-address", usage = "Public IP Address 
to announce messaging listener (default: same as -messaging-ip-address)", 
required = false)
+    public String messagingPublicIPAddress;
+
+    @Option(name = "-messaging-public-port", usage = "Public IP port to 
announce messaging listener (default: same as -messaging-port; must set 
-messaging-public-port also)", required = false)
+    public int messagingPublicPort = 0;
+
     @Argument
     @Option(name = "--", handler = StopOptionHandler.class)
     public List<String> appArgs;
@@ -139,13 +152,14 @@
         resultIPAddress = IniUtils.getString(ini, nodeSection, 
"result.address", resultIPAddress);
         resultPort = IniUtils.getInt(ini, nodeSection, "result.port", 
resultPort);
 
-        clusterNetPublicIPAddress = IniUtils.getString(
-                ini, nodeSection, "public.cluster.address", 
clusterNetPublicIPAddress);
+        clusterNetPublicIPAddress = IniUtils.getString(ini, nodeSection, 
"public.cluster.address",
+                clusterNetPublicIPAddress);
         clusterNetPublicPort = IniUtils.getInt(ini, nodeSection, 
"public.cluster.port", clusterNetPublicPort);
         dataPublicIPAddress = IniUtils.getString(ini, nodeSection, 
"public.data.address", dataPublicIPAddress);
         dataPublicPort = IniUtils.getInt(ini, nodeSection, "public.data.port", 
dataPublicPort);
         resultPublicIPAddress = IniUtils.getString(ini, nodeSection, 
"public.result.address", resultPublicIPAddress);
         resultPublicPort = IniUtils.getInt(ini, nodeSection, 
"public.result.port", resultPublicPort);
+        //TODO pass messaging info from ini file
 
         retries = IniUtils.getInt(ini, nodeSection, "retries", retries);
 
@@ -169,23 +183,32 @@
         }
 
         // "address" is the default for all IP addresses
-        if (clusterNetIPAddress == null) clusterNetIPAddress = ipAddress;
-        if (dataIPAddress == null) dataIPAddress = ipAddress;
-        if (resultIPAddress == null) resultIPAddress = ipAddress;
+        if (clusterNetIPAddress == null)
+            clusterNetIPAddress = ipAddress;
+        if (dataIPAddress == null)
+            dataIPAddress = ipAddress;
+        if (resultIPAddress == null)
+            resultIPAddress = ipAddress;
 
         // All "public" options default to their "non-public" versions
-        if (clusterNetPublicIPAddress == null) clusterNetPublicIPAddress = 
clusterNetIPAddress;
-        if (clusterNetPublicPort == 0) clusterNetPublicPort = clusterNetPort;
-        if (dataPublicIPAddress == null) dataPublicIPAddress = dataIPAddress;
-        if (dataPublicPort == 0) dataPublicPort = dataPort;
-        if (resultPublicIPAddress == null) resultPublicIPAddress = 
resultIPAddress;
-        if (resultPublicPort == 0) resultPublicPort = resultPort;
+        if (clusterNetPublicIPAddress == null)
+            clusterNetPublicIPAddress = clusterNetIPAddress;
+        if (clusterNetPublicPort == 0)
+            clusterNetPublicPort = clusterNetPort;
+        if (dataPublicIPAddress == null)
+            dataPublicIPAddress = dataIPAddress;
+        if (dataPublicPort == 0)
+            dataPublicPort = dataPort;
+        if (resultPublicIPAddress == null)
+            resultPublicIPAddress = resultIPAddress;
+        if (resultPublicPort == 0)
+            resultPublicPort = resultPort;
     }
 
     /**
      * @return An IApplicationConfig representing this NCConfig.
-     * Note: Currently this only includes the values from the configuration
-     * file, not anything specified on the command-line. QQQ
+     *         Note: Currently this only includes the values from the 
configuration
+     *         file, not anything specified on the command-line. QQQ
      */
     public IApplicationConfig getAppConfig() {
         return new IniApplicationConfig(ini);
@@ -222,6 +245,14 @@
         cList.add(resultPublicIPAddress);
         cList.add("-result-public-port");
         cList.add(String.valueOf(resultPublicPort));
+        cList.add("-messaging-ip-address");
+        cList.add(messagingIPAddress);
+        cList.add("-messaging-port");
+        cList.add(String.valueOf(messagingPort));
+        cList.add("-messaging-public-ip-address");
+        cList.add(messagingPublicIPAddress);
+        cList.add("-messaging-public-port");
+        cList.add(String.valueOf(messagingPublicPort));
         cList.add("-retries");
         cList.add(String.valueOf(retries));
         cList.add("-iodevices");
@@ -275,7 +306,10 @@
         configuration.put("result-time-to-live", String.valueOf(resultTTL));
         configuration.put("result-sweep-threshold", 
String.valueOf(resultSweepThreshold));
         configuration.put("result-manager-memory", 
String.valueOf(resultManagerMemory));
-
+        configuration.put("messaging-ip-address", messagingIPAddress);
+        configuration.put("messaging-port", String.valueOf(messagingPort));
+        configuration.put("messaging-public-ip-address", 
messagingPublicIPAddress);
+        configuration.put("messaging-public-port", 
String.valueOf(messagingPublicPort));
         if (appNCMainClass != null) {
             configuration.put("app-nc-main-class", appNCMainClass);
         }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
index 5a23455..bb8022e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
@@ -65,10 +65,13 @@
 
     private final HeartbeatSchema hbSchema;
 
+    private final NetworkAddress messagingPort;
+
     public NodeRegistration(InetSocketAddress ncAddress, String nodeId, 
NCConfig ncConfig, NetworkAddress dataPort,
             NetworkAddress datasetPort, String osName, String arch, String 
osVersion, int nProcessors, String vmName,
             String vmVersion, String vmVendor, String classpath, String 
libraryPath, String bootClasspath,
-            List<String> inputArguments, Map<String, String> systemProperties, 
HeartbeatSchema hbSchema) {
+            List<String> inputArguments, Map<String, String> systemProperties, 
HeartbeatSchema hbSchema,
+            NetworkAddress messagingPort) {
         this.ncAddress = ncAddress;
         this.nodeId = nodeId;
         this.ncConfig = ncConfig;
@@ -87,6 +90,7 @@
         this.inputArguments = inputArguments;
         this.systemProperties = systemProperties;
         this.hbSchema = hbSchema;
+        this.messagingPort = messagingPort;
     }
 
     public InetSocketAddress getNodeControllerAddress() {
@@ -160,4 +164,8 @@
     public Map<String, String> getSystemProperties() {
         return systemProperties;
     }
+
+    public NetworkAddress getMessagingPort() {
+        return messagingPort;
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 2f8def1..620f3ca 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -73,6 +73,7 @@
 import org.apache.hyracks.control.nc.io.profiling.IIOCounter;
 import org.apache.hyracks.control.nc.io.profiling.IOCounterFactory;
 import org.apache.hyracks.control.nc.net.DatasetNetworkManager;
+import org.apache.hyracks.control.nc.net.MessagingNetworkManager;
 import org.apache.hyracks.control.nc.net.NetworkManager;
 import org.apache.hyracks.control.nc.partitions.PartitionManager;
 import org.apache.hyracks.control.nc.resources.memory.MemoryManager;
@@ -157,6 +158,8 @@
     private boolean shuttedDown = false;
 
     private IIOCounter ioCounter;
+
+    private MessagingNetworkManager messagingNetManager;
 
     public NodeControllerService(NCConfig ncConfig) throws Exception {
         this.ncConfig = ncConfig;
@@ -248,6 +251,9 @@
         datasetNetworkManager = new 
DatasetNetworkManager(ncConfig.resultIPAddress, ncConfig.resultPort,
                 datasetPartitionManager, ncConfig.nNetThreads, 
ncConfig.nNetBuffers, ncConfig.resultPublicIPAddress,
                 ncConfig.resultPublicPort);
+        messagingNetManager = new 
MessagingNetworkManager(appCtx.getMessageBroker(), ncConfig.messagingIPAddress,
+                ncConfig.messagingPort, ncConfig.nNetThreads, 
ncConfig.messagingPublicIPAddress,
+                ncConfig.messagingPublicPort);
     }
 
     @Override
@@ -260,7 +266,9 @@
         init();
 
         datasetNetworkManager.start();
-        IIPCHandle ccIPCHandle = ipc.getHandle(new 
InetSocketAddress(ncConfig.ccHost, ncConfig.ccPort), ncConfig.retries);
+        messagingNetManager.start();
+        IIPCHandle ccIPCHandle = ipc.getHandle(new 
InetSocketAddress(ncConfig.ccHost, ncConfig.ccPort),
+                ncConfig.retries);
         this.ccs = new ClusterControllerRemoteProxy(ccIPCHandle);
         HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new 
HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
         for (int i = 0; i < gcInfos.length; ++i) {
@@ -270,12 +278,12 @@
         // Use "public" versions of network addresses and ports
         NetworkAddress datasetAddress = 
datasetNetworkManager.getPublicNetworkAddress();
         NetworkAddress netAddress = netManager.getPublicNetworkAddress();
-        ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, 
ncConfig, netAddress,
-                datasetAddress, osMXBean.getName(), osMXBean.getArch(), 
osMXBean
-                        .getVersion(), osMXBean.getAvailableProcessors(), 
runtimeMXBean.getVmName(), runtimeMXBean
-                        .getVmVersion(), runtimeMXBean.getVmVendor(), 
runtimeMXBean.getClassPath(), runtimeMXBean
-                        .getLibraryPath(), runtimeMXBean.getBootClassPath(), 
runtimeMXBean.getInputArguments(),
-                runtimeMXBean.getSystemProperties(), hbSchema));
+        NetworkAddress meesagingPort = 
messagingNetManager.getPublicNetworkAddress();
+        ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, 
ncConfig, netAddress, datasetAddress,
+                osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), 
osMXBean.getAvailableProcessors(),
+                runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(), 
runtimeMXBean.getVmVendor(),
+                runtimeMXBean.getClassPath(), runtimeMXBean.getLibraryPath(), 
runtimeMXBean.getBootClassPath(),
+                runtimeMXBean.getInputArguments(), 
runtimeMXBean.getSystemProperties(), hbSchema, meesagingPort));
 
         synchronized (this) {
             while (registrationPending) {
@@ -569,6 +577,10 @@
         return datasetPartitionManager;
     }
 
+    public MessagingNetworkManager getMessagingNetManager() {
+        return messagingNetManager;
+    }
+
     /**
      * Shutdown hook that invokes {@link NCApplicationEntryPoint#stop() stop} 
method.
      */
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/DatasetNetworkManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/DatasetNetworkManager.java
index 3c99ef3..a7c5f78 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/DatasetNetworkManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/DatasetNetworkManager.java
@@ -25,6 +25,7 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
 import org.apache.hyracks.api.dataset.ResultSetId;
@@ -32,7 +33,6 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.comm.channels.IChannelConnectionFactory;
 import org.apache.hyracks.comm.channels.NetworkOutputChannel;
-import org.apache.hyracks.net.buffers.ICloseableBufferAcceptor;
 import org.apache.hyracks.net.exceptions.NetException;
 import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
 import org.apache.hyracks.net.protocols.muxdemux.IChannelOpenListener;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/MessagingNetworkManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/MessagingNetworkManager.java
new file mode 100644
index 0000000..a04a939
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/MessagingNetworkManager.java
@@ -0,0 +1,138 @@
+package org.apache.hyracks.control.nc.net;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
+import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.messages.IMessageBroker;
+import org.apache.hyracks.comm.channels.IChannelConnectionFactory;
+import org.apache.hyracks.net.exceptions.NetException;
+import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
+import org.apache.hyracks.net.protocols.muxdemux.IChannelOpenListener;
+import org.apache.hyracks.net.protocols.muxdemux.MultiplexedConnection;
+import org.apache.hyracks.net.protocols.muxdemux.MuxDemux;
+import org.apache.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
+import org.apache.hyracks.util.StorageUtil;
+import org.apache.hyracks.util.StorageUtil.StorageUnit;
+
+public class MessagingNetworkManager implements IChannelConnectionFactory {
+
+    public static final int MAX_INITIAL_MESSAGE_SIZE = 
StorageUtil.getSizeInBytes(2, StorageUnit.KILOBYTE);
+    private static final Logger LOGGER = 
Logger.getLogger(MessagingNetworkManager.class.getName());
+    private static final int MAX_CONNECTION_ATTEMPTS = 5;
+    private final MuxDemux md;
+    private final IMessageBroker messageBroker;
+    private NetworkAddress localNetworkAddress;
+    private NetworkAddress publicNetworkAddress;
+
+    public MessagingNetworkManager(IMessageBroker messageBroker, String 
inetAddress, int inetPort, int nThreads,
+            String publicInetAddress, int publicInetPort) {
+        this.messageBroker = messageBroker;
+        md = new MuxDemux(new InetSocketAddress(inetAddress, inetPort), new 
ChannelOpenListener(), nThreads,
+                MAX_CONNECTION_ATTEMPTS);
+        publicNetworkAddress = new NetworkAddress(publicInetAddress, 
publicInetPort);
+    }
+
+    public void start() throws IOException {
+        md.start();
+        InetSocketAddress sockAddr = md.getLocalAddress();
+        localNetworkAddress = new NetworkAddress(sockAddr.getHostString(), 
sockAddr.getPort());
+
+        // See if the public address was explicitly specified, and if not,
+        // make it a copy of localNetworkAddress
+        if (publicNetworkAddress.getAddress() == null) {
+            publicNetworkAddress = localNetworkAddress;
+        } else {
+            // Likewise for public port
+            if (publicNetworkAddress.getPort() == 0) {
+                publicNetworkAddress = new 
NetworkAddress(publicNetworkAddress.getAddress(), sockAddr.getPort());
+            }
+        }
+    }
+
+    public void stop() {
+
+    }
+
+    @Override
+    public ChannelControlBlock connect(SocketAddress remoteAddress) throws 
InterruptedException, NetException {
+        MultiplexedConnection mConn = md.connect((InetSocketAddress) 
remoteAddress);
+        ChannelControlBlock openChannel = mConn.openChannel();
+        /**
+         * since this is a messaging channel, it might not send complete 
frames.
+         * Therefore, make it flush on every read message.
+         */
+        openChannel.getReadInterface().setFlushOnCompleteRead(true);
+        return openChannel;
+    }
+
+    public MuxDemuxPerformanceCounters getPerformanceCounters() {
+        return md.getPerformanceCounters();
+    }
+
+    public NetworkAddress getPublicNetworkAddress() {
+        return publicNetworkAddress;
+    }
+
+    public ByteBuffer constructAppMessagingInitialMessage(String ncId) throws 
HyracksDataException {
+        int intialMsgLength = Integer.BYTES + ncId.length();
+        if (intialMsgLength > MAX_INITIAL_MESSAGE_SIZE) {
+            throw new HyracksDataException(
+                    "Initial message exceded maximum size of " + 
MAX_INITIAL_MESSAGE_SIZE + " bytes");
+        }
+        ByteBuffer initialBuffer = ByteBuffer.allocate(intialMsgLength);
+        initialBuffer.putInt(ncId.length());
+        initialBuffer.put(ncId.getBytes());
+        initialBuffer.flip();
+        return initialBuffer;
+    }
+
+    private class ChannelOpenListener implements IChannelOpenListener {
+        @Override
+        public void channelOpened(ChannelControlBlock channel) {
+            channel.getReadInterface().setFlushOnCompleteRead(true);
+            channel.getReadInterface().setFullBufferAcceptor(new 
InitialBufferAcceptor(channel));
+            
channel.getReadInterface().getEmptyBufferAcceptor().accept(ByteBuffer.allocate(MAX_INITIAL_MESSAGE_SIZE));
+        }
+    }
+
+    private class InitialBufferAcceptor implements ICloseableBufferAcceptor {
+        private final ChannelControlBlock ccb;
+
+        public InitialBufferAcceptor(ChannelControlBlock ccb) {
+            this.ccb = ccb;
+        }
+
+        @Override
+        public void accept(ByteBuffer buffer) {
+            String nodeId = readAppMessagingInitialMessage(buffer);
+            if (LOGGER.isLoggable(Level.FINE)) {
+                LOGGER.fine("Opened messaging channel with node: " + nodeId);
+            }
+            messageBroker.registerMessagingChannel(nodeId, ccb);
+        }
+
+        @Override
+        public void close() {
+
+        }
+
+        @Override
+        public void error(int ecode) {
+
+        }
+
+        private String readAppMessagingInitialMessage(ByteBuffer buffer) {
+            int nodeIdLength = buffer.getInt();
+            byte[] stringBytes = new byte[nodeIdLength];
+            buffer.get(stringBytes);
+            return new String(stringBytes);
+        }
+    }
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
index 4b28aef..2f1a7d1 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
@@ -25,6 +25,7 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
 import org.apache.hyracks.api.exceptions.HyracksException;
@@ -33,7 +34,6 @@
 import org.apache.hyracks.comm.channels.IChannelConnectionFactory;
 import org.apache.hyracks.comm.channels.NetworkOutputChannel;
 import org.apache.hyracks.control.nc.partitions.PartitionManager;
-import org.apache.hyracks.net.buffers.ICloseableBufferAcceptor;
 import org.apache.hyracks.net.exceptions.NetException;
 import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
 import org.apache.hyracks.net.protocols.muxdemux.IChannelOpenListener;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/scheduler/SchedulerTest.java
 
b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/scheduler/SchedulerTest.java
index 373fe21..13cb171 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/scheduler/SchedulerTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/scheduler/SchedulerTest.java
@@ -21,24 +21,22 @@
 
 import java.io.FileReader;
 import java.io.IOException;
-import java.net.InetAddress;
-import java.util.HashMap;
 import java.util.Map;
-
-import junit.framework.Assert;
-import junit.framework.TestCase;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputSplit;
-import org.xml.sax.InputSource;
-import org.xml.sax.SAXException;
-
 import org.apache.hyracks.api.client.NodeControllerInfo;
 import org.apache.hyracks.api.client.NodeStatus;
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.topology.ClusterTopology;
 import org.apache.hyracks.api.topology.TopologyDefinitionParser;
+import org.apache.hyracks.hdfs.utils.TestUtils;
+import org.xml.sax.InputSource;
+import org.xml.sax.SAXException;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
 
 @SuppressWarnings("deprecation")
 public class SchedulerTest extends TestCase {
@@ -60,13 +58,8 @@
      * @throws Exception
      */
     public void testSchedulerSimple() throws Exception {
-        Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, 
NodeControllerInfo>();
-        ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", 
NodeStatus.ALIVE, new NetworkAddress("10.0.0.1", 5099), new 
NetworkAddress("10.0.0.1", 5098)));
-        ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", 
NodeStatus.ALIVE, new NetworkAddress("10.0.0.2", 5099), new 
NetworkAddress("10.0.0.2", 5098)));
-        ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", 
NodeStatus.ALIVE, new NetworkAddress("10.0.0.3", 5099), new 
NetworkAddress("10.0.0.3", 5098)));
-        ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", 
NodeStatus.ALIVE, new NetworkAddress("10.0.0.4", 5099), new 
NetworkAddress("10.0.0.4", 5098)));
-        ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", 
NodeStatus.ALIVE, new NetworkAddress("10.0.0.5", 5099), new 
NetworkAddress("10.0.0.5", 5098)));
-        ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", 
NodeStatus.ALIVE, new NetworkAddress("10.0.0.6", 5099), new 
NetworkAddress("10.0.0.6", 5098)));
+        Map<String, NodeControllerInfo> ncNameToNcInfos = 
TestUtils.generateNodeControllerInfo(6, "nc", "10.0.0.", 5099,
+                5098, 5097);
 
         InputSplit[] fileSplits = new InputSplit[6];
         fileSplits[0] = new FileSplit(new Path("part-1"), 0, 0, new String[] { 
"10.0.0.1", "10.0.0.2", "10.0.0.3" });
@@ -98,13 +91,17 @@
      * @throws Exception
      */
     public void testSchedulerLargerHDFS() throws Exception {
-        Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, 
NodeControllerInfo>();
-        ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", 
NodeStatus.ALIVE, new NetworkAddress("10.0.0.1", 5099), new 
NetworkAddress("10.0.0.1", 5098)));
-        ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", 
NodeStatus.ALIVE, new NetworkAddress("10.0.0.2", 5099), new 
NetworkAddress("10.0.0.2", 5098)));
-        ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", 
NodeStatus.ALIVE, new NetworkAddress("10.0.0.3", 5099), new 
NetworkAddress("10.0.0.3", 5098)));
-        ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", 
NodeStatus.ALIVE, new NetworkAddress("10.0.0.4", 5099), new 
NetworkAddress("10.0.0.4", 5098)));
-        ncNameToNcInfos.put("nc7", new NodeControllerInfo("nc7", 
NodeStatus.ALIVE, new NetworkAddress("10.0.0.7", 5099), new 
NetworkAddress("10.0.0.5", 5098)));
-        ncNameToNcInfos.put("nc12", new NodeControllerInfo("nc12", 
NodeStatus.ALIVE, new NetworkAddress("10.0.0.12", 5099), new 
NetworkAddress("10.0.0.5", 5098)));
+        int dataPort = 5099;
+        int resultPort = 5098;
+        int messagingPort = 5097;
+        Map<String, NodeControllerInfo> ncNameToNcInfos = 
TestUtils.generateNodeControllerInfo(4, "nc", "10.0.0.",
+                dataPort, resultPort, messagingPort);
+        ncNameToNcInfos.put("nc7",
+                new NodeControllerInfo("nc7", NodeStatus.ALIVE, new 
NetworkAddress("10.0.0.7", dataPort),
+                        new NetworkAddress("10.0.0.5", resultPort), new 
NetworkAddress("10.0.0.5", messagingPort)));
+        ncNameToNcInfos.put("nc12",
+                new NodeControllerInfo("nc12", NodeStatus.ALIVE, new 
NetworkAddress("10.0.0.12", dataPort),
+                        new NetworkAddress("10.0.0.5", resultPort), new 
NetworkAddress("10.0.0.5", messagingPort)));
 
         InputSplit[] fileSplits = new InputSplit[12];
         fileSplits[0] = new FileSplit(new Path("part-1"), 0, 0, new String[] { 
"10.0.0.1", "10.0.0.2", "10.0.0.3" });
@@ -115,7 +112,8 @@
         fileSplits[5] = new FileSplit(new Path("part-6"), 0, 0, new String[] { 
"10.0.0.2", "10.0.0.3", "10.0.0.5" });
         fileSplits[6] = new FileSplit(new Path("part-7"), 0, 0, new String[] { 
"10.0.0.1", "10.0.0.2", "10.0.0.3" });
         fileSplits[7] = new FileSplit(new Path("part-8"), 0, 0, new String[] { 
"10.0.0.3", "10.0.0.4", "10.0.0.5" });
-        fileSplits[8] = new FileSplit(new Path("part-12"), 0, 0, new String[] 
{ "10.0.0.14", "10.0.0.11", "10.0.0.13" });
+        fileSplits[8] = new FileSplit(new Path("part-12"), 0, 0,
+                new String[] { "10.0.0.14", "10.0.0.11", "10.0.0.13" });
         fileSplits[9] = new FileSplit(new Path("part-10"), 0, 0, new String[] 
{ "10.0.0.2", "10.0.0.1", "10.0.0.6" });
         fileSplits[10] = new FileSplit(new Path("part-11"), 0, 0, new String[] 
{ "10.0.0.3", "10.0.0.4", "10.0.0.7" });
         fileSplits[11] = new FileSplit(new Path("part-9"), 0, 0, new String[] 
{ "10.0.0.4", "10.0.0.5", "10.0.0.6" });
@@ -123,8 +121,8 @@
         Scheduler scheduler = new Scheduler(ncNameToNcInfos);
         String[] locationConstraints = 
scheduler.getLocationConstraints(fileSplits);
 
-        String[] expectedResults = new String[] { "nc1", "nc4", "nc4", "nc1", 
"nc3", "nc2", "nc2", "nc3", "nc12",
-                "nc7", "nc7", "nc12" };
+        String[] expectedResults = new String[] { "nc1", "nc4", "nc4", "nc1", 
"nc3", "nc2", "nc2", "nc3", "nc12", "nc7",
+                "nc7", "nc12" };
         for (int i = 0; i < locationConstraints.length; i++) {
             Assert.assertEquals(locationConstraints[i], expectedResults[i]);
         }
@@ -145,13 +143,8 @@
      * @throws Exception
      */
     public void testSchedulerSmallerHDFS() throws Exception {
-        Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, 
NodeControllerInfo>();
-        ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", 
NodeStatus.ALIVE, new NetworkAddress("10.0.0.1", 5099), new 
NetworkAddress("10.0.0.1", 5098)));
-        ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", 
NodeStatus.ALIVE, new NetworkAddress("10.0.0.2", 5099), new 
NetworkAddress("10.0.0.2", 5098)));
-        ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", 
NodeStatus.ALIVE, new NetworkAddress("10.0.0.3", 5099), new 
NetworkAddress("10.0.0.3", 5098)));
-        ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", 
NodeStatus.ALIVE, new NetworkAddress("10.0.0.4", 5099), new 
NetworkAddress("10.0.0.4", 5098)));
-        ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", 
NodeStatus.ALIVE, new NetworkAddress("10.0.0.5", 5099), new 
NetworkAddress("10.0.0.5", 5098)));
-        ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", 
NodeStatus.ALIVE, new NetworkAddress("10.0.0.6", 5099), new 
NetworkAddress("10.0.0.6", 5098)));
+        Map<String, NodeControllerInfo> ncNameToNcInfos = 
TestUtils.generateNodeControllerInfo(6, "nc", "10.0.0.", 5099,
+                5098, 5097);
 
         InputSplit[] fileSplits = new InputSplit[12];
         fileSplits[0] = new FileSplit(new Path("part-1"), 0, 0, new String[] { 
"10.0.0.1", "10.0.0.2", "10.0.0.3" });
@@ -191,13 +184,8 @@
      * @throws Exception
      */
     public void testSchedulerSmallerHDFSOdd() throws Exception {
-        Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, 
NodeControllerInfo>();
-        ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", 
NodeStatus.ALIVE, new NetworkAddress("10.0.0.1", 5099), new 
NetworkAddress("10.0.0.1", 5098)));
-        ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", 
NodeStatus.ALIVE, new NetworkAddress("10.0.0.2", 5099), new 
NetworkAddress("10.0.0.2", 5098)));
-        ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", 
NodeStatus.ALIVE, new NetworkAddress("10.0.0.3", 5099), new 
NetworkAddress("10.0.0.3", 5098)));
-        ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", 
NodeStatus.ALIVE, new NetworkAddress("10.0.0.4", 5099), new 
NetworkAddress("10.0.0.4", 5098)));
-        ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", 
NodeStatus.ALIVE, new NetworkAddress("10.0.0.5", 5099), new 
NetworkAddress("10.0.0.5", 5098)));
-        ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", 
NodeStatus.ALIVE, new NetworkAddress("10.0.0.6", 5099), new 
NetworkAddress("10.0.0.6", 5098)));
+        Map<String, NodeControllerInfo> ncNameToNcInfos = 
TestUtils.generateNodeControllerInfo(6, "nc", "10.0.0.", 5099,
+                5098, 5097);
 
         InputSplit[] fileSplits = new InputSplit[13];
         fileSplits[0] = new FileSplit(new Path("part-1"), 0, 0, new String[] { 
"10.0.0.1", "10.0.0.2", "10.0.0.3" });
@@ -239,13 +227,8 @@
      * @throws Exception
      */
     public void testSchedulercBoundary() throws Exception {
-        Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, 
NodeControllerInfo>();
-        ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", 
NodeStatus.ALIVE, new NetworkAddress("10.0.0.1", 5099), new 
NetworkAddress("10.0.0.1", 5098)));
-        ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", 
NodeStatus.ALIVE, new NetworkAddress("10.0.0.2", 5099), new 
NetworkAddress("10.0.0.2", 5098)));
-        ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", 
NodeStatus.ALIVE, new NetworkAddress("10.0.0.3", 5099), new 
NetworkAddress("10.0.0.3", 5098)));
-        ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", 
NodeStatus.ALIVE, new NetworkAddress("10.0.0.4", 5099), new 
NetworkAddress("10.0.0.4", 5098)));
-        ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", 
NodeStatus.ALIVE, new NetworkAddress("10.0.0.5", 5099), new 
NetworkAddress("10.0.0.5", 5098)));
-        ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", 
NodeStatus.ALIVE, new NetworkAddress("10.0.0.6", 5099), new 
NetworkAddress("10.0.0.6", 5098)));
+        Map<String, NodeControllerInfo> ncNameToNcInfos = 
TestUtils.generateNodeControllerInfo(6, "nc", "10.0.0.", 5099,
+                5098, 5097);
 
         /** test empty file splits */
         InputSplit[] fileSplits = new InputSplit[0];
diff --git 
a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/utils/TestUtils.java
 
b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/utils/TestUtils.java
index c2ad4a0..8755cf3 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/utils/TestUtils.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/utils/TestUtils.java
@@ -22,6 +22,12 @@
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileReader;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hyracks.api.client.NodeControllerInfo;
+import org.apache.hyracks.api.client.NodeStatus;
+import org.apache.hyracks.api.comm.NetworkAddress;
 
 public class TestUtils {
 
@@ -38,8 +44,8 @@
                     throw new Exception("Actual result changed at line " + num 
+ ":\n< " + lineExpected + "\n> ");
                 }
                 if (!equalStrings(lineExpected, lineActual)) {
-                    throw new Exception("Result for changed at line " + num + 
":\n< " + lineExpected + "\n> "
-                            + lineActual);
+                    throw new Exception(
+                            "Result for changed at line " + num + ":\n< " + 
lineExpected + "\n> " + lineActual);
                 }
                 ++num;
             }
@@ -94,4 +100,16 @@
         return true;
     }
 
+    public static Map<String, NodeControllerInfo> 
generateNodeControllerInfo(int numberOfNodes, String ncNamePrefix,
+            String addressPrefix, int netPort, int dataPort, int 
messagingPort) {
+        Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, 
NodeControllerInfo>();
+        for (int i = 1; i <= numberOfNodes; i++) {
+            String ncId = ncNamePrefix + i;
+            String ncAddress = addressPrefix + i;
+            ncNameToNcInfos.put(ncId,
+                    new NodeControllerInfo(ncId, NodeStatus.ALIVE, new 
NetworkAddress(ncAddress, netPort),
+                            new NetworkAddress(ncAddress, dataPort), new 
NetworkAddress(ncAddress, messagingPort)));
+        }
+        return ncNameToNcInfos;
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs2/scheduler/SchedulerTest.java
 
b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs2/scheduler/SchedulerTest.java
index 6eabb71..793e029 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs2/scheduler/SchedulerTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs2/scheduler/SchedulerTest.java
@@ -19,22 +19,18 @@
 
 package org.apache.hyracks.hdfs2.scheduler;
 
-import java.net.InetAddress;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
-import junit.framework.Assert;
-import junit.framework.TestCase;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-
 import org.apache.hyracks.api.client.NodeControllerInfo;
-import org.apache.hyracks.api.client.NodeStatus;
-import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.hdfs.utils.TestUtils;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
 
 /**
  * Test case for the new HDFS API scheduler
@@ -47,13 +43,8 @@
      * @throws Exception
      */
     public void testSchedulerSimple() throws Exception {
-        Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, 
NodeControllerInfo>();
-        ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", 
NodeStatus.ALIVE, new NetworkAddress("10.0.0.1", 5099), new 
NetworkAddress("10.0.0.1", 5098)));
-        ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", 
NodeStatus.ALIVE, new NetworkAddress("10.0.0.2", 5099), new 
NetworkAddress("10.0.0.2", 5098)));
-        ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", 
NodeStatus.ALIVE, new NetworkAddress("10.0.0.3", 5099), new 
NetworkAddress("10.0.0.3", 5098)));
-        ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", 
NodeStatus.ALIVE, new NetworkAddress("10.0.0.4", 5099), new 
NetworkAddress("10.0.0.4", 5098)));
-        ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", 
NodeStatus.ALIVE, new NetworkAddress("10.0.0.5", 5099), new 
NetworkAddress("10.0.0.5", 5098)));
-        ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", 
NodeStatus.ALIVE, new NetworkAddress("10.0.0.6", 5099), new 
NetworkAddress("10.0.0.6", 5098)));
+        Map<String, NodeControllerInfo> ncNameToNcInfos = 
TestUtils.generateNodeControllerInfo(6, "nc", "10.0.0.", 5099,
+                5098, 5097);
 
         List<InputSplit> fileSplits = new ArrayList<InputSplit>();
         fileSplits.add(new FileSplit(new Path("part-1"), 0, 0, new String[] { 
"10.0.0.1", "10.0.0.2", "10.0.0.3" }));
@@ -79,13 +70,8 @@
      * @throws Exception
      */
     public void testSchedulerLargerHDFS() throws Exception {
-        Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, 
NodeControllerInfo>();
-        ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", 
NodeStatus.ALIVE, new NetworkAddress("10.0.0.1", 5099), new 
NetworkAddress("10.0.0.1", 5098)));
-        ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", 
NodeStatus.ALIVE, new NetworkAddress("10.0.0.2", 5099), new 
NetworkAddress("10.0.0.2", 5098)));
-        ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", 
NodeStatus.ALIVE, new NetworkAddress("10.0.0.3", 5099), new 
NetworkAddress("10.0.0.3", 5098)));
-        ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", 
NodeStatus.ALIVE, new NetworkAddress("10.0.0.4", 5099), new 
NetworkAddress("10.0.0.4", 5098)));
-        ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", 
NodeStatus.ALIVE, new NetworkAddress("10.0.0.5", 5099), new 
NetworkAddress("10.0.0.5", 5098)));
-        ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", 
NodeStatus.ALIVE, new NetworkAddress("10.0.0.6", 5099), new 
NetworkAddress("10.0.0.6", 5098)));
+        Map<String, NodeControllerInfo> ncNameToNcInfos = 
TestUtils.generateNodeControllerInfo(6, "nc", "10.0.0.", 5099,
+                5098, 5097);
 
         List<InputSplit> fileSplits = new ArrayList<InputSplit>();
         fileSplits.add(new FileSplit(new Path("part-1"), 0, 0, new String[] { 
"10.0.0.1", "10.0.0.2", "10.0.0.3" }));
@@ -118,13 +104,8 @@
      * @throws Exception
      */
     public void testSchedulerSmallerHDFS() throws Exception {
-        Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, 
NodeControllerInfo>();
-        ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", 
NodeStatus.ALIVE, new NetworkAddress("10.0.0.1", 5099), new 
NetworkAddress("10.0.0.1", 5098)));
-        ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", 
NodeStatus.ALIVE, new NetworkAddress("10.0.0.2", 5099), new 
NetworkAddress("10.0.0.2", 5098)));
-        ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", 
NodeStatus.ALIVE, new NetworkAddress("10.0.0.3", 5099), new 
NetworkAddress("10.0.0.3", 5098)));
-        ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", 
NodeStatus.ALIVE, new NetworkAddress("10.0.0.4", 5099), new 
NetworkAddress("10.0.0.4", 5098)));
-        ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", 
NodeStatus.ALIVE, new NetworkAddress("10.0.0.5", 5099), new 
NetworkAddress("10.0.0.5", 5098)));
-        ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", 
NodeStatus.ALIVE, new NetworkAddress("10.0.0.6", 5099), new 
NetworkAddress("10.0.0.6", 5098)));
+        Map<String, NodeControllerInfo> ncNameToNcInfos = 
TestUtils.generateNodeControllerInfo(6, "nc", "10.0.0.", 5099,
+                5098, 5097);
 
         List<InputSplit> fileSplits = new ArrayList<InputSplit>();
         fileSplits.add(new FileSplit(new Path("part-1"), 0, 0, new String[] { 
"10.0.0.1", "10.0.0.2", "10.0.0.3" }));
@@ -157,13 +138,8 @@
      * @throws Exception
      */
     public void testSchedulerSmallerHDFSOdd() throws Exception {
-        Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, 
NodeControllerInfo>();
-        ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", 
NodeStatus.ALIVE, new NetworkAddress("10.0.0.1", 5099), new 
NetworkAddress("10.0.0.1", 5098)));
-        ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", 
NodeStatus.ALIVE, new NetworkAddress("10.0.0.2", 5099), new 
NetworkAddress("10.0.0.2", 5098)));
-        ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", 
NodeStatus.ALIVE, new NetworkAddress("10.0.0.3", 5099), new 
NetworkAddress("10.0.0.3", 5098)));
-        ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", 
NodeStatus.ALIVE, new NetworkAddress("10.0.0.4", 5099), new 
NetworkAddress("10.0.0.4", 5098)));
-        ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", 
NodeStatus.ALIVE, new NetworkAddress("10.0.0.5", 5099), new 
NetworkAddress("10.0.0.5", 5098)));
-        ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", 
NodeStatus.ALIVE, new NetworkAddress("10.0.0.6", 5099), new 
NetworkAddress("10.0.0.6", 5098)));
+        Map<String, NodeControllerInfo> ncNameToNcInfos = 
TestUtils.generateNodeControllerInfo(6, "nc", "10.0.0.", 5099,
+                5098, 5097);
 
         List<InputSplit> fileSplits = new ArrayList<InputSplit>();
         fileSplits.add(new FileSplit(new Path("part-1"), 0, 0, new String[] { 
"10.0.0.1", "10.0.0.2", "10.0.0.3" }));
diff --git a/hyracks-fullstack/hyracks/hyracks-net/pom.xml 
b/hyracks-fullstack/hyracks/hyracks-net/pom.xml
index 3c354f0..96b87fb 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-net/pom.xml
@@ -42,6 +42,11 @@
 
   <dependencies>
     <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-api</artifactId>
+      <version>0.2.18-SNAPSHOT</version>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
     </dependency>
diff --git 
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
 
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
index 6d82297..100ed05 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
@@ -28,8 +28,12 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.hyracks.net.buffers.IBufferAcceptor;
-import org.apache.hyracks.net.buffers.ICloseableBufferAcceptor;
+import org.apache.hyracks.api.comm.IBufferAcceptor;
+import org.apache.hyracks.api.comm.IBufferFactory;
+import org.apache.hyracks.api.comm.IChannelControlBlock;
+import org.apache.hyracks.api.comm.IChannelReadInterface;
+import org.apache.hyracks.api.comm.IChannelWriteInterface;
+import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
 import org.apache.hyracks.net.exceptions.NetException;
 
 /**
@@ -37,7 +41,7 @@
  *
  * @author vinayakb
  */
-public class ChannelControlBlock {
+public class ChannelControlBlock implements IChannelControlBlock {
     private static final Logger LOGGER = 
Logger.getLogger(ChannelControlBlock.class.getName());
 
     private final ChannelSet cSet;
@@ -71,20 +75,12 @@
         return channelId;
     }
 
-    /**
-     * Get the read inderface of this channel.
-     *
-     * @return the read interface.
-     */
+    @Override
     public IChannelReadInterface getReadInterface() {
         return ri;
     }
 
-    /**
-     * Get the write interface of this channel.
-     *
-     * @return the write interface.
-     */
+    @Override
     public IChannelWriteInterface getWriteInterface() {
         return wi;
     }
@@ -92,7 +88,7 @@
     private final class ReadInterface implements IChannelReadInterface {
         private final Deque<ByteBuffer> riEmptyStack;
 
-        private final IBufferAcceptor eba = new IBufferAcceptor() {
+        private IBufferAcceptor eba = new IBufferAcceptor() {
             @Override
             public void accept(ByteBuffer buffer) {
                 int delta = buffer.remaining();
@@ -114,6 +110,8 @@
 
         private IBufferFactory bufferFactory;
 
+        private boolean flushOnCompleteRead = false;
+
         ReadInterface() {
             riEmptyStack = new ArrayDeque<ByteBuffer>();
             credits = 0;
@@ -133,6 +131,16 @@
         @Override
         public IBufferAcceptor getEmptyBufferAcceptor() {
             return eba;
+        }
+
+        @Override
+        public void setEmptyBufferAcceptor(IBufferAcceptor eba) {
+            this.eba = eba;
+        }
+
+        @Override
+        public void setFlushOnCompleteRead(boolean flushOnCompleteRead) {
+            this.flushOnCompleteRead = flushOnCompleteRead;
         }
 
         int read(SocketChannel sc, int size) throws IOException, NetException {
@@ -168,7 +176,7 @@
                 } else {
                     return size;
                 }
-                if (currentReadBuffer.remaining() <= 0) {
+                if (currentReadBuffer.remaining() <= 0 || flushOnCompleteRead) 
{
                     flush();
                 }
             }
@@ -363,7 +371,8 @@
         this.ri.credits = credits;
     }
 
-    synchronized void addWriteCredits(int delta) {
+    @Override
+    public synchronized void addWriteCredits(int delta) {
         wi.credits += delta;
         wi.adjustChannelWritability();
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/NetTest.java
 
b/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/NetTest.java
index f1f3c4a..bf01fdb 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/NetTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/NetTest.java
@@ -28,9 +28,8 @@
 import junit.framework.Assert;
 
 import org.junit.Test;
-
-import org.apache.hyracks.net.buffers.IBufferAcceptor;
-import org.apache.hyracks.net.buffers.ICloseableBufferAcceptor;
+import org.apache.hyracks.api.comm.IBufferAcceptor;
+import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
 import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
 import org.apache.hyracks.net.protocols.muxdemux.IChannelOpenListener;
 import org.apache.hyracks.net.protocols.muxdemux.MultiplexedConnection;

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/897
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I5c0bd7c11c1e78954ebceff49cb274d8073a64bd
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <[email protected]>

Reply via email to