abdullah alamoudi has submitted this change and it was merged.

Change subject: Improve Messaging Connector and Add Unit Tests
......................................................................


Improve Messaging Connector and Add Unit Tests

Before this change, messaging connector always reserves 100 bytes
for messages which are mostly un-used. With this change, it only
reserves two bytes and sends null messages by default. In case a
new message doesn't fit in the leftover space of a frame, it sends
the frame with a null message, followed by a dedicated frame for
the message.

Change-Id: If4336e9c234e8d282798cfba9f48432b46cccfca
Reviewed-on: https://asterix-gerrit.ics.uci.edu/880
Reviewed-by: Murtadha Hubail <hubail...@gmail.com>
Reviewed-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
---
M asterixdb/asterix-app/pom.xml
A 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestTupleGenerator.java
A 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java
A 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPartitionComputerFactory.java
A 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPartitionWriterFactory.java
R 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestRecordDescriptorFactory.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
M 
asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestControlledFrameWriter.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestFrameWriter.java
M 
hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
M 
hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppenderWrapper.java
M 
hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
M 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java
M 
hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
23 files changed, 796 insertions(+), 95 deletions(-)

Approvals:
  Murtadha Hubail: Looks good to me, approved
  Jenkins: Looks good to me, but someone else must approve; Verified



diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml
index 99f1c2f..76d05d5 100644
--- a/asterixdb/asterix-app/pom.xml
+++ b/asterixdb/asterix-app/pom.xml
@@ -286,5 +286,10 @@
       <artifactId>asterix-external-data</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+        <groupId>org.apache.hyracks</groupId>
+        <artifactId>hyracks-test-support</artifactId>
+        <version>0.2.18-SNAPSHOT</version>
+    </dependency>
   </dependencies>
 </project>
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestTupleGenerator.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestTupleGenerator.java
new file mode 100644
index 0000000..e267cc7
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestTupleGenerator.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.common;
+
+import java.util.Random;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.util.GrowableArray;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import 
org.apache.hyracks.dataflow.common.data.marshalling.BooleanSerializerDeserializer;
+import 
org.apache.hyracks.dataflow.common.data.marshalling.DoubleSerializerDeserializer;
+import 
org.apache.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;
+import 
org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import org.apache.hyracks.util.string.UTF8StringReader;
+import org.apache.hyracks.util.string.UTF8StringWriter;
+
+public class TestTupleGenerator {
+    private final int stringFieldSizes;
+    private final FieldType[] types;
+    private final boolean reuseObject;
+    private final Random random = new Random();
+    private ITupleReference tuple;
+    private UTF8StringSerializerDeserializer stringSerde =
+            new UTF8StringSerializerDeserializer(new UTF8StringWriter(), new 
UTF8StringReader());
+    private GrowableArray[] fields;
+
+    public enum FieldType {
+        Integer64,
+        Boolean,
+        Double,
+        String
+    }
+
+    public TestTupleGenerator(FieldType[] types, int stringFieldSizes, boolean 
resueObject) {
+        this.types = types;
+        this.stringFieldSizes = stringFieldSizes;
+        this.reuseObject = resueObject;
+        this.fields = new GrowableArray[types.length];
+        for (int i = 0; i < types.length; i++) {
+            fields[i] = new GrowableArray();
+        }
+        tuple = new TestTupleReference(fields);
+    }
+
+    public ITupleReference next() throws HyracksDataException {
+        if (reuseObject) {
+            for (int i = 0; i < types.length; i++) {
+                fields[i].reset();
+            }
+        } else {
+            this.fields = new GrowableArray[types.length];
+            for (int i = 0; i < types.length; i++) {
+                fields[i] = new GrowableArray();
+            }
+            tuple = new TestTupleReference(fields);
+        }
+        for (int i = 0; i < types.length; i++) {
+            FieldType type = types[i];
+            switch (type) {
+                case Boolean:
+                    Boolean aBoolean = random.nextBoolean();
+                    BooleanSerializerDeserializer.INSTANCE.serialize(aBoolean, 
fields[i].getDataOutput());
+                    break;
+                case Double:
+                    double aDouble = random.nextDouble();
+                    DoubleSerializerDeserializer.INSTANCE.serialize(aDouble, 
fields[i].getDataOutput());
+                    break;
+                case Integer64:
+                    long aLong = random.nextLong();
+                    Integer64SerializerDeserializer.INSTANCE.serialize(aLong, 
fields[i].getDataOutput());
+                    break;
+                case String:
+                    String aString = 
RandomStringUtils.randomAlphanumeric(stringFieldSizes);
+                    stringSerde.serialize(aString, fields[i].getDataOutput());
+                    break;
+                default:
+                    break;
+            }
+        }
+        return tuple;
+    }
+
+    private class TestTupleReference implements ITupleReference {
+        private final GrowableArray[] fields;
+
+        private TestTupleReference(GrowableArray[] fields) {
+            this.fields = fields;
+        }
+
+        @Override
+        public int getFieldCount() {
+            return fields.length;
+        }
+
+        @Override
+        public byte[] getFieldData(int fIdx) {
+
+            return fields[fIdx].getByteArray();
+        }
+
+        @Override
+        public int getFieldStart(int fIdx) {
+            return 0;
+        }
+
+        @Override
+        public int getFieldLength(int fIdx) {
+            return fields[fIdx].getLength();
+        }
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java
new file mode 100644
index 0000000..2f712cc
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.dataflow;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.asterix.external.util.FeedUtils;
+import org.apache.asterix.test.common.TestTupleGenerator;
+import org.apache.asterix.test.common.TestTupleGenerator.FieldType;
+import org.apache.hyracks.api.comm.FrameHelper;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
+import org.apache.hyracks.api.test.TestFrameWriter;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import 
org.apache.hyracks.dataflow.common.data.marshalling.BooleanSerializerDeserializer;
+import 
org.apache.hyracks.dataflow.common.data.marshalling.DoubleSerializerDeserializer;
+import 
org.apache.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;
+import 
org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
+import 
org.apache.hyracks.dataflow.std.connectors.MToNPartitioningWithMessageConnectorDescriptor;
+import 
org.apache.hyracks.dataflow.std.connectors.PartitionWithMessageDataWriter;
+import org.apache.hyracks.test.support.TestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class ConnectorDescriptorWithMessagingTest {
+
+    // Tuples used in this test are made of Fields of 
{Integer,Double,Boolean,UTF8String};
+    private static final int NUMBER_OF_CONSUMERS = 5;
+    private static final int DEFAULT_FRAME_SIZE = 32768;
+    private static final int CURRENT_PRODUCER = 0;
+    private static final int STRING_FIELD_SIZES = 32;
+
+    @Test
+    public void testEmptyFrames() throws Exception {
+        try {
+            List<Integer> routing = Arrays.asList(0, 1, 2, 3, 4);
+            IConnectorDescriptorRegistry connDescRegistry = 
Mockito.mock(IConnectorDescriptorRegistry.class);
+            ITuplePartitionComputerFactory partitionComputerFactory = new 
TestPartitionComputerFactory(routing);
+            MToNPartitioningWithMessageConnectorDescriptor connector =
+                    new 
MToNPartitioningWithMessageConnectorDescriptor(connDescRegistry, 
partitionComputerFactory);
+            IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+            VSizeFrame message = new VSizeFrame(ctx);
+            VSizeFrame tempBuffer = new VSizeFrame(ctx);
+            ctx.setSharedObject(message);
+            message.getBuffer().clear();
+            
message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
+            message.getBuffer().flip();
+            ISerializerDeserializer<?>[] serdes = new 
ISerializerDeserializer<?>[] {
+                    Integer64SerializerDeserializer.INSTANCE, 
DoubleSerializerDeserializer.INSTANCE,
+                    BooleanSerializerDeserializer.INSTANCE, new 
UTF8StringSerializerDeserializer() };
+            RecordDescriptor rDesc = new RecordDescriptor(serdes);
+            TestPartitionWriterFactory partitionWriterFactory = new 
TestPartitionWriterFactory();
+            IFrameWriter partitioner = connector.createPartitioner(ctx, rDesc, 
partitionWriterFactory, CURRENT_PRODUCER,
+                    NUMBER_OF_CONSUMERS, NUMBER_OF_CONSUMERS);
+            partitioner.open();
+            FrameTupleAccessor fta = new FrameTupleAccessor(rDesc);
+            List<TestFrameWriter> recipients = new ArrayList<>();
+            for (IFrameWriter writer : 
partitionWriterFactory.getWriters().values()) {
+                recipients.add((TestFrameWriter) writer);
+            }
+            partitioner.flush();
+            for (TestFrameWriter writer : recipients) {
+                Assert.assertEquals(writer.nextFrameCount(), 1);
+                fta.reset(writer.getLastFrame());
+                Assert.assertEquals(fta.getTupleCount(), 1);
+                FeedUtils.processFeedMessage(writer.getLastFrame(), 
tempBuffer, fta);
+                
Assert.assertEquals(MessagingFrameTupleAppender.NULL_FEED_MESSAGE,
+                        
MessagingFrameTupleAppender.getMessageType(tempBuffer));
+            }
+            message.getBuffer().clear();
+            
message.getBuffer().put(MessagingFrameTupleAppender.ACK_REQ_FEED_MESSAGE);
+            message.getBuffer().flip();
+            partitioner.flush();
+            for (TestFrameWriter writer : recipients) {
+                Assert.assertEquals(writer.nextFrameCount(), 2);
+                fta.reset(writer.getLastFrame());
+                Assert.assertEquals(fta.getTupleCount(), 1);
+                FeedUtils.processFeedMessage(writer.getLastFrame(), 
tempBuffer, fta);
+                
Assert.assertEquals(MessagingFrameTupleAppender.ACK_REQ_FEED_MESSAGE,
+                        
MessagingFrameTupleAppender.getMessageType(tempBuffer));
+            }
+
+            message.getBuffer().clear();
+            
message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
+            message.getBuffer().flip();
+            partitioner.flush();
+            for (TestFrameWriter writer : recipients) {
+                Assert.assertEquals(writer.nextFrameCount(), 3);
+                fta.reset(writer.getLastFrame());
+                Assert.assertEquals(fta.getTupleCount(), 1);
+                FeedUtils.processFeedMessage(writer.getLastFrame(), 
tempBuffer, fta);
+                
Assert.assertEquals(MessagingFrameTupleAppender.NULL_FEED_MESSAGE,
+                        
MessagingFrameTupleAppender.getMessageType(tempBuffer));
+            }
+            partitioner.close();
+            for (TestFrameWriter writer : recipients) {
+                Assert.assertEquals(writer.nextFrameCount(), 4);
+                Assert.assertEquals(writer.closeCount(), 1);
+            }
+
+        } catch (Throwable th) {
+            th.printStackTrace();
+            throw th;
+        }
+    }
+
+    @Test
+    public void testMessageLargerThanEmptyFrame() throws Exception {
+        try {
+            List<Integer> routing = Arrays.asList(0, 1, 2, 3, 4);
+            IConnectorDescriptorRegistry connDescRegistry = 
Mockito.mock(IConnectorDescriptorRegistry.class);
+            ITuplePartitionComputerFactory partitionComputerFactory = new 
TestPartitionComputerFactory(routing);
+            MToNPartitioningWithMessageConnectorDescriptor connector =
+                    new 
MToNPartitioningWithMessageConnectorDescriptor(connDescRegistry, 
partitionComputerFactory);
+            IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+            VSizeFrame message = new VSizeFrame(ctx);
+            VSizeFrame tempBuffer = new VSizeFrame(ctx);
+            ctx.setSharedObject(message);
+            writeRandomMessage(message, 
MessagingFrameTupleAppender.SNAPSHOT_MESSAGE, DEFAULT_FRAME_SIZE + 1);
+            ISerializerDeserializer<?>[] serdes = new 
ISerializerDeserializer<?>[] {
+                    Integer64SerializerDeserializer.INSTANCE, 
DoubleSerializerDeserializer.INSTANCE,
+                    BooleanSerializerDeserializer.INSTANCE, new 
UTF8StringSerializerDeserializer() };
+            RecordDescriptor rDesc = new RecordDescriptor(serdes);
+            TestPartitionWriterFactory partitionWriterFactory = new 
TestPartitionWriterFactory();
+            IFrameWriter partitioner = connector.createPartitioner(ctx, rDesc, 
partitionWriterFactory, CURRENT_PRODUCER,
+                    NUMBER_OF_CONSUMERS, NUMBER_OF_CONSUMERS);
+            partitioner.open();
+            FrameTupleAccessor fta = new FrameTupleAccessor(rDesc);
+            List<TestFrameWriter> recipients = new ArrayList<>();
+            for (IFrameWriter writer : 
partitionWriterFactory.getWriters().values()) {
+                recipients.add((TestFrameWriter) writer);
+            }
+            partitioner.flush();
+            for (TestFrameWriter writer : recipients) {
+                Assert.assertEquals(writer.nextFrameCount(), 1);
+                fta.reset(writer.getLastFrame());
+                Assert.assertEquals(fta.getTupleCount(), 1);
+                FeedUtils.processFeedMessage(writer.getLastFrame(), 
tempBuffer, fta);
+                
Assert.assertEquals(MessagingFrameTupleAppender.SNAPSHOT_MESSAGE,
+                        
MessagingFrameTupleAppender.getMessageType(tempBuffer));
+            }
+            message.getBuffer().clear();
+            
message.getBuffer().put(MessagingFrameTupleAppender.ACK_REQ_FEED_MESSAGE);
+            message.getBuffer().flip();
+            partitioner.flush();
+            for (TestFrameWriter writer : recipients) {
+                Assert.assertEquals(writer.nextFrameCount(), 2);
+                fta.reset(writer.getLastFrame());
+                Assert.assertEquals(fta.getTupleCount(), 1);
+                FeedUtils.processFeedMessage(writer.getLastFrame(), 
tempBuffer, fta);
+                
Assert.assertEquals(MessagingFrameTupleAppender.ACK_REQ_FEED_MESSAGE,
+                        
MessagingFrameTupleAppender.getMessageType(tempBuffer));
+            }
+            message.getBuffer().clear();
+            
message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
+            message.getBuffer().flip();
+            partitioner.flush();
+            for (TestFrameWriter writer : recipients) {
+                Assert.assertEquals(writer.nextFrameCount(), 3);
+                fta.reset(writer.getLastFrame());
+                Assert.assertEquals(fta.getTupleCount(), 1);
+                FeedUtils.processFeedMessage(writer.getLastFrame(), 
tempBuffer, fta);
+                
Assert.assertEquals(MessagingFrameTupleAppender.NULL_FEED_MESSAGE,
+                        
MessagingFrameTupleAppender.getMessageType(tempBuffer));
+            }
+            partitioner.close();
+            for (TestFrameWriter writer : recipients) {
+                Assert.assertEquals(writer.nextFrameCount(), 4);
+                Assert.assertEquals(writer.closeCount(), 1);
+            }
+        } catch (Throwable th) {
+            th.printStackTrace();
+            throw th;
+        }
+    }
+
+    private void writeRandomMessage(VSizeFrame frame, byte tag, int size) 
throws HyracksDataException {
+        // We subtract 2, 1 for the tag, and one for the end offset
+        Random random = new Random();
+        byte[] bytes = new byte[size - 2];
+        random.nextBytes(bytes);
+        int frameSize = FrameHelper.calcAlignedFrameSizeToStore(1, size - 1, 
DEFAULT_FRAME_SIZE);
+        frame.ensureFrameSize(frameSize);
+        frame.getBuffer().clear();
+        frame.getBuffer().put(tag);
+        frame.getBuffer().put(bytes);
+        frame.getBuffer().flip();
+    }
+
+    @Test
+    public void testMessageLargerThanSome() throws Exception {
+        try {
+            // Routing will be to 1, 3, and 4 only. 0 and 2 will receive no 
tuples
+            List<Integer> routing = Arrays.asList(1, 3, 4);
+            IConnectorDescriptorRegistry connDescRegistry = 
Mockito.mock(IConnectorDescriptorRegistry.class);
+            ITuplePartitionComputerFactory partitionComputerFactory = new 
TestPartitionComputerFactory(routing);
+            MToNPartitioningWithMessageConnectorDescriptor connector =
+                    new 
MToNPartitioningWithMessageConnectorDescriptor(connDescRegistry, 
partitionComputerFactory);
+            IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+            VSizeFrame message = new VSizeFrame(ctx);
+            VSizeFrame tempBuffer = new VSizeFrame(ctx);
+            ctx.setSharedObject(message);
+            message.getBuffer().clear();
+            writeRandomMessage(message, 
MessagingFrameTupleAppender.SNAPSHOT_MESSAGE, DEFAULT_FRAME_SIZE);
+            ISerializerDeserializer<?>[] serdes = new 
ISerializerDeserializer<?>[] {
+                    Integer64SerializerDeserializer.INSTANCE, 
DoubleSerializerDeserializer.INSTANCE,
+                    BooleanSerializerDeserializer.INSTANCE, new 
UTF8StringSerializerDeserializer() };
+            FieldType[] types = { FieldType.Integer64, FieldType.Double, 
FieldType.Boolean, FieldType.String };
+            RecordDescriptor rDesc = new RecordDescriptor(serdes);
+            TestPartitionWriterFactory partitionWriterFactory = new 
TestPartitionWriterFactory();
+            PartitionWithMessageDataWriter partitioner =
+                    (PartitionWithMessageDataWriter) 
connector.createPartitioner(ctx, rDesc, partitionWriterFactory,
+                            CURRENT_PRODUCER, NUMBER_OF_CONSUMERS, 
NUMBER_OF_CONSUMERS);
+            partitioner.open();
+            FrameTupleAccessor fta = new FrameTupleAccessor(rDesc);
+            List<TestFrameWriter> recipients = new ArrayList<>();
+            for (int i = 0; i < 
partitionWriterFactory.getWriters().values().size(); i++) {
+                recipients.add(partitionWriterFactory.getWriters().get(i));
+            }
+            TestTupleGenerator ttg = new TestTupleGenerator(types, 
STRING_FIELD_SIZES, true);
+            VSizeFrame frame = new VSizeFrame(ctx);
+            FrameTupleAppender appender = new FrameTupleAppender(frame);
+            ITupleReference tuple = ttg.next();
+            while (appender.append(tuple)) {
+                tuple = ttg.next();
+            }
+            partitioner.nextFrame(frame.getBuffer());
+            partitioner.flush();
+            
Assert.assertEquals(partitionWriterFactory.getWriters().get(0).nextFrameCount(),
 1);
+            
Assert.assertEquals(partitionWriterFactory.getWriters().get(1).nextFrameCount(),
 2);
+            
Assert.assertEquals(partitionWriterFactory.getWriters().get(2).nextFrameCount(),
 1);
+            
Assert.assertEquals(partitionWriterFactory.getWriters().get(3).nextFrameCount(),
 2);
+            
Assert.assertEquals(partitionWriterFactory.getWriters().get(4).nextFrameCount(),
 2);
+            for (TestFrameWriter writer : recipients) {
+                fta.reset(writer.getLastFrame());
+                Assert.assertEquals(fta.getTupleCount(), 1);
+                FeedUtils.processFeedMessage(writer.getLastFrame(), 
tempBuffer, fta);
+                
Assert.assertEquals(MessagingFrameTupleAppender.SNAPSHOT_MESSAGE,
+                        
MessagingFrameTupleAppender.getMessageType(tempBuffer));
+            }
+            partitioner.close();
+        } catch (Throwable th) {
+            th.printStackTrace();
+            throw th;
+        }
+    }
+
+    @Test
+    public void testMessageFitsWithTuples() throws Exception {
+        try {
+            // Routing will be round robin
+            List<Integer> routing = Arrays.asList(0, 1, 2, 3, 4);
+            IConnectorDescriptorRegistry connDescRegistry = 
Mockito.mock(IConnectorDescriptorRegistry.class);
+            ITuplePartitionComputerFactory partitionComputerFactory = new 
TestPartitionComputerFactory(routing);
+            MToNPartitioningWithMessageConnectorDescriptor connector =
+                    new 
MToNPartitioningWithMessageConnectorDescriptor(connDescRegistry, 
partitionComputerFactory);
+            IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
+            VSizeFrame message = new VSizeFrame(ctx);
+            VSizeFrame tempBuffer = new VSizeFrame(ctx);
+            ctx.setSharedObject(message);
+            message.getBuffer().clear();
+            
message.getBuffer().put(MessagingFrameTupleAppender.ACK_REQ_FEED_MESSAGE);
+            message.getBuffer().flip();
+            ISerializerDeserializer<?>[] serdes = new 
ISerializerDeserializer<?>[] {
+                    Integer64SerializerDeserializer.INSTANCE, 
DoubleSerializerDeserializer.INSTANCE,
+                    BooleanSerializerDeserializer.INSTANCE, new 
UTF8StringSerializerDeserializer() };
+            FieldType[] types = { FieldType.Integer64, FieldType.Double, 
FieldType.Boolean, FieldType.String };
+            RecordDescriptor rDesc = new RecordDescriptor(serdes);
+            TestPartitionWriterFactory partitionWriterFactory = new 
TestPartitionWriterFactory();
+            PartitionWithMessageDataWriter partitioner =
+                    (PartitionWithMessageDataWriter) 
connector.createPartitioner(ctx, rDesc, partitionWriterFactory,
+                            CURRENT_PRODUCER, NUMBER_OF_CONSUMERS, 
NUMBER_OF_CONSUMERS);
+            partitioner.open();
+            FrameTupleAccessor fta = new FrameTupleAccessor(rDesc);
+            List<TestFrameWriter> recipients = new ArrayList<>();
+            for (int i = 0; i < 
partitionWriterFactory.getWriters().values().size(); i++) {
+                recipients.add(partitionWriterFactory.getWriters().get(i));
+            }
+            TestTupleGenerator ttg = new TestTupleGenerator(types, 
STRING_FIELD_SIZES, true);
+            VSizeFrame frame = new VSizeFrame(ctx);
+            FrameTupleAppender appender = new FrameTupleAppender(frame);
+            for (int count = 0; count < NUMBER_OF_CONSUMERS; count++) {
+                ITupleReference tuple = ttg.next();
+                appender.append(tuple);
+            }
+            partitioner.nextFrame(frame.getBuffer());
+            partitioner.flush();
+            
Assert.assertEquals(partitionWriterFactory.getWriters().get(0).nextFrameCount(),
 1);
+            
Assert.assertEquals(partitionWriterFactory.getWriters().get(1).nextFrameCount(),
 1);
+            
Assert.assertEquals(partitionWriterFactory.getWriters().get(2).nextFrameCount(),
 1);
+            
Assert.assertEquals(partitionWriterFactory.getWriters().get(3).nextFrameCount(),
 1);
+            
Assert.assertEquals(partitionWriterFactory.getWriters().get(4).nextFrameCount(),
 1);
+            for (TestFrameWriter writer : recipients) {
+                fta.reset(writer.getLastFrame());
+                Assert.assertEquals(fta.getTupleCount(), 2);
+                FeedUtils.processFeedMessage(writer.getLastFrame(), 
tempBuffer, fta);
+                
Assert.assertEquals(MessagingFrameTupleAppender.ACK_REQ_FEED_MESSAGE,
+                        
MessagingFrameTupleAppender.getMessageType(tempBuffer));
+            }
+            partitioner.close();
+        } catch (Throwable th) {
+            th.printStackTrace();
+            throw th;
+        }
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPartitionComputerFactory.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPartitionComputerFactory.java
new file mode 100644
index 0000000..385f6a2
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPartitionComputerFactory.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.dataflow;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class TestPartitionComputerFactory implements 
ITuplePartitionComputerFactory {
+
+    private static final long serialVersionUID = 1L;
+    private final List<Integer> destinations;
+
+    /*
+     * For test purposes, this partition computer produces partitions 
according to a passed in
+     * list of integers.
+     */
+    public TestPartitionComputerFactory(List<Integer> destinations) {
+        this.destinations = destinations;
+    }
+
+    @Override
+    public ITuplePartitionComputer createPartitioner() {
+        return new ITuplePartitionComputer() {
+            private final List<Integer> destinations =
+                    new 
ArrayList<Integer>(TestPartitionComputerFactory.this.destinations);
+            private Iterator<Integer> iterator = destinations.iterator();
+
+            @Override
+            public int partition(IFrameTupleAccessor accessor, int tIndex, int 
nParts) throws HyracksDataException {
+                if (destinations.size() == 0) {
+                    return 0;
+                }
+                while (!iterator.hasNext()) {
+                    iterator = destinations.iterator();
+                }
+                return iterator.next();
+            }
+        };
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPartitionWriterFactory.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPartitionWriterFactory.java
new file mode 100644
index 0000000..4b4c722
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPartitionWriterFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.dataflow;
+
+import java.util.Collections;
+import java.util.HashMap;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.IPartitionWriterFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.test.FrameWriterTestUtils;
+import org.apache.hyracks.api.test.TestFrameWriter;
+
+/*
+ * A partition writer factory that is used for testing partitioners
+ */
+public class TestPartitionWriterFactory implements IPartitionWriterFactory {
+    private HashMap<Integer, TestFrameWriter> writers = new HashMap<>();
+
+    @Override
+    public IFrameWriter createFrameWriter(int receiverIndex) throws 
HyracksDataException {
+        // The created writers must retain a deep copy of the input frame
+        writers.put(receiverIndex, 
FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(), 
true));
+        return writers.get(receiverIndex);
+    }
+
+    public HashMap<Integer, TestFrameWriter> getWriters() {
+        return writers;
+    }
+}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedMessageUtils.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestRecordDescriptorFactory.java
similarity index 70%
rename from 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedMessageUtils.java
rename to 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestRecordDescriptorFactory.java
index 68783ca..536bf3a 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedMessageUtils.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestRecordDescriptorFactory.java
@@ -16,14 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.util;
+package org.apache.asterix.test.dataflow;
 
-public class FeedMessageUtils {
-    public enum MessageType {
-        NULL,
-        ACK_REQUEST
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class TestRecordDescriptorFactory {
+    public RecordDescriptor 
createRecordDescriptor(ISerializerDeserializer<?>... serdes) {
+        return null;
     }
-
-    public static final byte NULL_FEED_MESSAGE = (byte) 
MessageType.NULL.ordinal();
-    public static final byte ACK_REQ_FEED_MESSAGE = (byte) 
MessageType.ACK_REQUEST.ordinal();
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index b09bef9..87daffa 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -67,18 +67,7 @@
                     continue;
                 }
                 tb.reset();
-                try {
-                    dataParser.parse(record, tb.getDataOutput());
-                } catch (Exception e) {
-                    e.printStackTrace();
-                    LOGGER.warn(ExternalDataConstants.ERROR_PARSE_RECORD, e);
-                    feedLogManager.logRecord(record.toString(), 
ExternalDataConstants.ERROR_PARSE_RECORD);
-                    continue;
-                }
-                tb.addFieldEndOffset();
-                addMetaPart(tb, record);
-                addPrimaryKeys(tb, record);
-                tupleForwarder.addTuple(tb);
+                parseAndForward(record);
             }
         } catch (InterruptedException e) {
             //TODO: Find out what could cause an interrupted exception beside 
termination of a job/feed
@@ -107,6 +96,23 @@
         }
     }
 
+    private void parseAndForward(IRawRecord<? extends T> record) throws 
IOException {
+        synchronized (dataParser) {
+            try {
+                dataParser.parse(record, tb.getDataOutput());
+            } catch (Exception e) {
+                LOGGER.warn(ExternalDataConstants.ERROR_PARSE_RECORD, e);
+                feedLogManager.logRecord(record.toString(), 
ExternalDataConstants.ERROR_PARSE_RECORD);
+                // continue the outer loop
+                return;
+            }
+            tb.addFieldEndOffset();
+            addMetaPart(tb, record);
+            addPrimaryKeys(tb, record);
+            tupleForwarder.addTuple(tb);
+        }
+    }
+
     protected void addMetaPart(ArrayTupleBuilder tb, IRawRecord<? extends T> 
record) throws IOException {
     }
 
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
index ed5e355..0d72682 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
@@ -31,9 +31,8 @@
     private final AsterixInputStream stream;
 
     public FeedStreamDataFlowController(IHyracksTaskContext ctx, 
FeedTupleForwarder tupleForwarder,
-            FeedLogManager feedLogManager, int numOfFields, IStreamDataParser 
streamParser,
-            AsterixInputStream inputStream) {
-        super(ctx, tupleForwarder, feedLogManager, numOfFields);
+            FeedLogManager feedLogManager, IStreamDataParser streamParser, 
AsterixInputStream inputStream) {
+        super(ctx, tupleForwarder, feedLogManager, 1);
         this.dataParser = streamParser;
         this.stream = inputStream;
     }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
index 7ae2f41..f1eb870 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
@@ -19,14 +19,12 @@
 package org.apache.asterix.external.dataflow;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 
 import javax.annotation.Nonnull;
 
 import org.apache.asterix.external.api.ITupleForwarder;
 import org.apache.asterix.external.util.DataflowUtils;
 import org.apache.asterix.external.util.FeedLogManager;
-import org.apache.asterix.external.util.FeedMessageUtils;
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
@@ -35,6 +33,7 @@
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
 
 public class FeedTupleForwarder implements ITupleForwarder {
 
@@ -60,10 +59,10 @@
             this.writer = writer;
             this.appender = new FrameTupleAppender(frame);
             // Set null feed message
-            ByteBuffer message = (ByteBuffer) ctx.getSharedObject();
+            VSizeFrame message = (VSizeFrame) ctx.getSharedObject();
             // a null message
-            message.put(FeedMessageUtils.NULL_FEED_MESSAGE);
-            message.flip();
+            
message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
+            message.getBuffer().flip();
             initialized = true;
         }
     }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
index 5e8c022..6cdc2af 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
@@ -18,16 +18,15 @@
  */
 package org.apache.asterix.external.feed.runtime;
 
-import java.nio.ByteBuffer;
 import java.util.logging.Level;
 
 import org.apache.asterix.external.feed.api.ISubscriberRuntime;
 import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
 import org.apache.asterix.external.feed.dataflow.FeedFrameCollector;
 import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
 
 public class IngestionRuntime extends SubscribableRuntime {
 
@@ -48,7 +47,7 @@
         dWriter.subscribe(collector);
         subscribers.add(collectionRuntime);
         if (numSubscribers == 0) {
-            
ctx.setSharedObject(ByteBuffer.allocate(MessagingFrameTupleAppender.MAX_MESSAGE_SIZE));
+            ctx.setSharedObject(new VSizeFrame(ctx));
             collectionRuntime.getCtx().setSharedObject(ctx.getSharedObject());
             adapterRuntimeManager.start();
         }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
index 716468e..8d8bc28 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
@@ -34,13 +34,13 @@
 import org.apache.asterix.external.feed.runtime.FeedRuntime;
 import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
 import org.apache.asterix.external.util.FeedUtils;
+import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IActivity;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
 import 
org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 
 /*
@@ -86,7 +86,7 @@
 
     private final FeedRuntimeType runtimeType = FeedRuntimeType.COMPUTE;
 
-    private ByteBuffer message = 
ByteBuffer.allocate(MessagingFrameTupleAppender.MAX_MESSAGE_SIZE);
+    private final VSizeFrame message;
 
     private final FeedMetaOperatorDescriptor opDesc;
 
@@ -111,6 +111,7 @@
         this.connectionId = feedConnectionId;
         this.feedManager = (FeedManager) ((IAsterixAppRuntimeContext) 
ctx.getJobletContext().getApplicationContext()
                 .getApplicationObject()).getFeedManager();
+        this.message = new VSizeFrame(ctx);
         ctx.setSharedObject(message);
         this.opDesc = feedMetaOperatorDescriptor;
         this.recordDescProvider = recordDescProvider;
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
index b79707b..47df39e 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
@@ -35,13 +35,13 @@
 import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
 import org.apache.asterix.external.util.FeedUtils;
 import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IActivity;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
 import 
org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 
 public class FeedMetaStoreNodePushable extends 
AbstractUnaryInputUnaryOutputOperatorNodePushable {
@@ -87,7 +87,7 @@
 
     private final String targetId;
 
-    private final ByteBuffer message = 
ByteBuffer.allocate(MessagingFrameTupleAppender.MAX_MESSAGE_SIZE);
+    private final VSizeFrame message;
 
     private final IRecordDescriptorProvider recordDescProvider;
 
@@ -106,6 +106,7 @@
         this.feedManager = (FeedManager) ((IAsterixAppRuntimeContext) 
ctx.getJobletContext().getApplicationContext()
                 .getApplicationObject()).getFeedManager();
         this.targetId = targetId;
+        this.message = new VSizeFrame(ctx);
         ctx.setSharedObject(message);
         this.recordDescProvider = recordDescProvider;
         this.opDesc = feedMetaOperatorDescriptor;
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
index 6ba27d8..50ebb71 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
@@ -48,7 +48,6 @@
 import org.apache.asterix.external.util.DataflowUtils;
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.external.util.FeedLogManager;
-import org.apache.asterix.external.util.FeedUtils;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -73,8 +72,8 @@
                                 DataflowUtils.getTupleForwarder(configuration, 
feedLogManager), dataParser,
                                 recordReader, ((IIndexingDatasource) 
recordReader).getIndexer());
                     } else if (isFeed) {
-                        FeedTupleForwarder tupleForwarder = 
(FeedTupleForwarder) DataflowUtils
-                                .getTupleForwarder(configuration, 
feedLogManager);
+                        FeedTupleForwarder tupleForwarder =
+                                (FeedTupleForwarder) 
DataflowUtils.getTupleForwarder(configuration, feedLogManager);
                         boolean isChangeFeed = 
ExternalDataUtils.isChangeFeed(configuration);
                         boolean isRecordWithMeta = 
ExternalDataUtils.isRecordWithMeta(configuration);
                         if (isRecordWithMeta) {
@@ -108,7 +107,7 @@
                     if (isFeed) {
                         return new FeedStreamDataFlowController(ctx,
                                 (FeedTupleForwarder) 
DataflowUtils.getTupleForwarder(configuration, feedLogManager),
-                                feedLogManager, 
FeedUtils.getNumOfFields(configuration), streamParser, stream);
+                                feedLogManager, streamParser, stream);
                     } else {
                         return new StreamDataFlowController(ctx, 
DataflowUtils.getTupleForwarder(configuration, null),
                                 streamParser);
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
index 51e7e72..8228c39 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
@@ -32,13 +32,14 @@
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint.PartitionConstraintType;
 import org.apache.hyracks.api.comm.FrameHelper;
+import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.util.IntSerDeUtils;
 import org.apache.hyracks.dataflow.std.file.FileSplit;
+import org.apache.hyracks.util.IntSerDeUtils;
 
 public class FeedUtils {
     private static String prepareDataverseFeedName(String dataverseName, 
String feedName) {
@@ -49,8 +50,8 @@
             ClusterPartition partition) {
         File relPathFile = new File(prepareDataverseFeedName(dataverseName, 
feedName));
         String storageDirName = 
AsterixClusterProperties.INSTANCE.getStorageDirectoryName();
-        String storagePartitionPath = 
StoragePathUtil.prepareStoragePartitionPath(storageDirName,
-                partition.getPartitionId());
+        String storagePartitionPath =
+                StoragePathUtil.prepareStoragePartitionPath(storageDirName, 
partition.getPartitionId());
         // Note: feed adapter instances in a single node share the feed logger
         // format: 'storage dir name'/partition_#/dataverse/feed/node
         File f = new File(storagePartitionPath + File.separator + relPathFile 
+ File.separator + nodeName);
@@ -88,20 +89,19 @@
                 feedLogFileSplit.getIODeviceId(), 
ctx.getIOManager()).getFile());
     }
 
-    public static void processFeedMessage(ByteBuffer input, ByteBuffer 
message, FrameTupleAccessor fta) {
+    public static void processFeedMessage(ByteBuffer input, VSizeFrame 
message, FrameTupleAccessor fta)
+            throws HyracksDataException {
         // read the message and reduce the number of tuples
         fta.reset(input);
         int tc = fta.getTupleCount() - 1;
         int offset = fta.getTupleStartOffset(tc);
         int len = fta.getTupleLength(tc);
-        message.clear();
-        message.put(input.array(), offset, len);
-        message.flip();
+        int newSize = FrameHelper.calcAlignedFrameSizeToStore(1, len, 
message.getMinSize());
+        message.ensureFrameSize(newSize);
+        message.getBuffer().clear();
+        message.getBuffer().put(input.array(), offset, len);
+        message.getBuffer().flip();
         IntSerDeUtils.putInt(input.array(), 
FrameHelper.getTupleCountOffset(input.capacity()), tc);
-    }
-
-    public static int getNumOfFields(Map<String, String> configuration) {
-        return 1;
     }
 
     public static String getFeedMetaTypeName(Map<String, String> 
configuration) {
diff --git 
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
 
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
index c00db7a..3becd96 100644
--- 
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
+++ 
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java
@@ -118,7 +118,8 @@
             FeedPolicyAccessor fpa =
                     createFeedPolicyAccessor(true, false, NUM_FRAMES * 
DEFAULT_FRAME_SIZE, DISCARD_ALLOWANCE);
             // Non-Active Writer
-            TestFrameWriter writer = 
FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList());
+            TestFrameWriter writer =
+                    FrameWriterTestUtils.create(Collections.emptyList(), 
Collections.emptyList(), false);
             // FramePool
             ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, 
0, DEFAULT_FRAME_SIZE);
             FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, 
fpa, framePool);
@@ -159,7 +160,8 @@
             FeedPolicyAccessor fpa =
                     createFeedPolicyAccessor(true, false, NUM_FRAMES * 
DEFAULT_FRAME_SIZE, DISCARD_ALLOWANCE);
             // Non-Active Writer
-            TestFrameWriter writer = 
FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList());
+            TestFrameWriter writer =
+                    FrameWriterTestUtils.create(Collections.emptyList(), 
Collections.emptyList(), false);
             // FramePool
             ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, 
0, DEFAULT_FRAME_SIZE);
             FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, 
fpa, framePool);
@@ -207,7 +209,7 @@
             FeedPolicyAccessor fpa =
                     createFeedPolicyAccessor(true, true, DEFAULT_FRAME_SIZE * 
numberOfSpillFrames, DISCARD_ALLOWANCE);
             // Non-Active Writer
-            TestControlledFrameWriter writer = 
FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE);
+            TestControlledFrameWriter writer = 
FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
             writer.freeze();
             // FramePool
             ConcurrentFramePool framePool =
@@ -317,7 +319,7 @@
             FeedPolicyAccessor fpa =
                     createFeedPolicyAccessor(true, true, DEFAULT_FRAME_SIZE * 
numberOfSpillFrames, DISCARD_ALLOWANCE);
             // Non-Active Writer
-            TestControlledFrameWriter writer = 
FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE);
+            TestControlledFrameWriter writer = 
FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
             writer.freeze();
             // FramePool
             ConcurrentFramePool framePool =
@@ -390,7 +392,7 @@
             // Spill budget = Memory budget, No discard
             FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, true, 
DEFAULT_FRAME_SIZE, DISCARD_ALLOWANCE);
             // Non-Active Writer
-            TestControlledFrameWriter writer = 
FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE);
+            TestControlledFrameWriter writer = 
FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
             writer.freeze();
             // FramePool
             ConcurrentFramePool framePool =
@@ -452,7 +454,7 @@
             // Spill budget = Memory budget, No discard
             FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, true, 
DEFAULT_FRAME_SIZE, DISCARD_ALLOWANCE);
             // Non-Active Writer
-            TestControlledFrameWriter writer = 
FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE);
+            TestControlledFrameWriter writer = 
FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
             writer.freeze();
             // FramePool
             ConcurrentFramePool framePool =
@@ -509,7 +511,7 @@
             FeedPolicyAccessor fpa =
                     createFeedPolicyAccessor(true, false, DEFAULT_FRAME_SIZE * 
NUM_FRAMES, DISCARD_ALLOWANCE);
             // Non-Active Writer
-            TestControlledFrameWriter writer = 
FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE);
+            TestControlledFrameWriter writer = 
FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
             writer.freeze();
             // FramePool
             ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, 
FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
@@ -553,7 +555,8 @@
             // No spill, No discard
             FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, false, 
0L, DISCARD_ALLOWANCE);
             // Non-Active Writer
-            TestFrameWriter writer = 
FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList());
+            TestFrameWriter writer =
+                    FrameWriterTestUtils.create(Collections.emptyList(), 
Collections.emptyList(), false);
             // FramePool
             ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, 
FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
             FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, 
fpa, framePool);
@@ -593,7 +596,8 @@
             // No spill, No discard
             FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, false, 
0L, DISCARD_ALLOWANCE);
             // Non-Active Writer
-            TestFrameWriter writer = 
FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList());
+            TestFrameWriter writer =
+                    FrameWriterTestUtils.create(Collections.emptyList(), 
Collections.emptyList(), false);
             // FramePool
             ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, 
FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
             FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, 
fpa, framePool);
@@ -632,7 +636,7 @@
             // No spill, No discard
             FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, false, 
0L, DISCARD_ALLOWANCE);
             // Non-Active Writer
-            TestControlledFrameWriter writer = 
FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE);
+            TestControlledFrameWriter writer = 
FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
             writer.freeze();
             // FramePool
             ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, 
FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
@@ -682,7 +686,7 @@
             FeedPolicyAccessor fpa =
                     createFeedPolicyAccessor(true, false, DEFAULT_FRAME_SIZE * 
NUM_FRAMES, DISCARD_ALLOWANCE);
             // Non-Active Writer
-            TestControlledFrameWriter writer = 
FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE);
+            TestControlledFrameWriter writer = 
FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
             writer.freeze();
             // FramePool
             ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, 
FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
@@ -732,7 +736,7 @@
             // No spill, No discard
             FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, false, 
0L, DISCARD_ALLOWANCE);
             // Non-Active Writer
-            TestControlledFrameWriter writer = 
FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE);
+            TestControlledFrameWriter writer = 
FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false);
             writer.freeze();
             // FramePool
             ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, 
FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java
index 4bddfa9..c9cc71e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java
@@ -33,7 +33,7 @@
     }
 
     public static TestFrameWriter create(Collection<FrameWriterOperation> 
exceptionThrowingOperations,
-            Collection<FrameWriterOperation> errorThrowingOperations) {
+            Collection<FrameWriterOperation> errorThrowingOperations, boolean 
deepCopyInputFrames) {
         CountAnswer openAnswer =
                 createAnswer(FrameWriterOperation.Open, 
exceptionThrowingOperations, errorThrowingOperations);
         CountAnswer nextAnswer =
@@ -44,7 +44,7 @@
                 createAnswer(FrameWriterOperation.Fail, 
exceptionThrowingOperations, errorThrowingOperations);
         CountAnswer closeAnswer =
                 createAnswer(FrameWriterOperation.Close, 
exceptionThrowingOperations, errorThrowingOperations);
-        return new TestFrameWriter(openAnswer, nextAnswer, flushAnswer, 
failAnswer, closeAnswer);
+        return new TestFrameWriter(openAnswer, nextAnswer, flushAnswer, 
failAnswer, closeAnswer, deepCopyInputFrames);
     }
 
     public static CountAnswer createAnswer(FrameWriterOperation operation,
@@ -59,7 +59,7 @@
         }
     }
 
-    public static TestControlledFrameWriter create(int initialFrameSize) {
-        return new TestControlledFrameWriter(initialFrameSize);
+    public static TestControlledFrameWriter create(int initialFrameSize, 
boolean deepCopyInputFrames) {
+        return new TestControlledFrameWriter(initialFrameSize, 
deepCopyInputFrames);
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestControlledFrameWriter.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestControlledFrameWriter.java
index bf168c2..b98bcf7 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestControlledFrameWriter.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestControlledFrameWriter.java
@@ -30,8 +30,9 @@
     private volatile int currentMultiplier = 0;
     private volatile int kicks = 0;
 
-    public TestControlledFrameWriter(int initialFrameSize) {
-        super(new CountAnswer(), new CountAnswer(), new CountAnswer(), new 
CountAnswer(), new CountAnswer());
+    public TestControlledFrameWriter(int initialFrameSize, boolean 
deepCopyInputFrames) {
+        super(new CountAnswer(), new CountAnswer(), new CountAnswer(), new 
CountAnswer(), new CountAnswer(),
+                deepCopyInputFrames);
         this.initialFrameSize = initialFrameSize;
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestFrameWriter.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestFrameWriter.java
index b3492fe..065e64d 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestFrameWriter.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestFrameWriter.java
@@ -29,19 +29,27 @@
     private final CountAnswer flushAnswer;
     private final CountAnswer failAnswer;
     private final CountAnswer closeAnswer;
+    private static final int BYTES32KB = 32768;
     private long openDuration = 0L;
     private long nextDuration = 0L;
     private long flushDuration = 0L;
     private long failDuration = 0L;
     private long closeDuration = 0L;
+    // If copyFrames was set, we take a copy of the frame, otherwise, we 
simply point lastFrame to it
+    private final boolean deepCopyFrames;
+    private ByteBuffer lastFrame;
 
     public TestFrameWriter(CountAnswer openAnswer, CountAnswer nextAnswer, 
CountAnswer flushAnswer,
-            CountAnswer failAnswer, CountAnswer closeAnswer) {
+            CountAnswer failAnswer, CountAnswer closeAnswer, boolean 
deepCopyFrames) {
         this.openAnswer = openAnswer;
         this.nextAnswer = nextAnswer;
         this.closeAnswer = closeAnswer;
         this.flushAnswer = flushAnswer;
         this.failAnswer = failAnswer;
+        this.deepCopyFrames = deepCopyFrames;
+        if (deepCopyFrames) {
+            lastFrame = ByteBuffer.allocate(BYTES32KB);
+        }
     }
 
     @Override
@@ -56,6 +64,15 @@
 
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        if (deepCopyFrames) {
+            if (lastFrame.capacity() != buffer.capacity()) {
+                lastFrame = ByteBuffer.allocate(buffer.capacity());
+            }
+            lastFrame.clear();
+            lastFrame.put(buffer.array());
+        } else {
+            lastFrame = buffer;
+        }
         delay(nextDuration);
         nextAnswer.call();
     }
@@ -170,4 +187,8 @@
             }
         }
     }
+
+    public ByteBuffer getLastFrame() {
+        return lastFrame;
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
index 1623035..ef11b5b 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
@@ -25,6 +25,7 @@
 import org.apache.hyracks.api.comm.IFrameTupleAccessor;
 import org.apache.hyracks.api.comm.IFrameTupleAppender;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.util.IntSerDeUtils;
 
 public class FrameTupleAppender extends AbstractFrameAppender implements 
IFrameTupleAppender {
@@ -51,6 +52,30 @@
             }
             System.arraycopy(bytes, offset, array, tupleDataEndOffset + 
fieldSlots.length * 4, length);
             tupleDataEndOffset += fieldSlots.length * 4 + length;
+            IntSerDeUtils.putInt(getBuffer().array(),
+                    FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 
* (tupleCount + 1), tupleDataEndOffset);
+            ++tupleCount;
+            IntSerDeUtils.putInt(getBuffer().array(), 
FrameHelper.getTupleCountOffset(frame.getFrameSize()),
+                    tupleCount);
+            return true;
+        }
+        return false;
+    }
+
+    public boolean append(ITupleReference tuple) throws HyracksDataException {
+        int tupleSize = 0;
+        for (int i = 0; i < tuple.getFieldCount(); i++) {
+            tupleSize += tuple.getFieldLength(i);
+        }
+        if (canHoldNewTuple(tuple.getFieldCount(), tupleSize)) {
+            int offset = 0;
+            for (int i = 0; i < tuple.getFieldCount(); ++i) {
+                IntSerDeUtils.putInt(array, tupleDataEndOffset + i * 4, 
offset);
+                System.arraycopy(tuple.getFieldData(i), 
tuple.getFieldStart(i), array,
+                        tupleDataEndOffset + tuple.getFieldCount() * 4, 
tuple.getFieldLength(i));
+                offset += tuple.getFieldLength(i);
+            }
+            tupleDataEndOffset += tuple.getFieldCount() * 4 + tupleSize;
             IntSerDeUtils.putInt(getBuffer().array(),
                     FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 
* (tupleCount + 1), tupleDataEndOffset);
             ++tupleCount;
@@ -252,8 +277,8 @@
             int fEndOffset = 0;
             for (int i = 0; i < fields.length; ++i) {
                 int fSrcStart = tStartOffset + fSrcSlotsLength + 
accessor.getFieldStartOffset(tIndex, fields[i]);
-                int fLen = accessor.getFieldEndOffset(tIndex, fields[i])
-                        - accessor.getFieldStartOffset(tIndex, fields[i]);
+                int fLen =
+                        accessor.getFieldEndOffset(tIndex, fields[i]) - 
accessor.getFieldStartOffset(tIndex, fields[i]);
                 System.arraycopy(accessor.getBuffer().array(), fSrcStart, 
array,
                         tupleDataEndOffset + fTargetSlotsLength + 
fStartOffset, fLen);
                 fEndOffset += fLen;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppenderWrapper.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppenderWrapper.java
index 77495dd..e57e12d 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppenderWrapper.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppenderWrapper.java
@@ -71,19 +71,6 @@
         FrameUtils.appendSkipEmptyFieldToWriter(outputWriter, 
frameTupleAppender, fieldSlots, bytes, offset, length);
     }
 
-    public void append(byte[] bytes, int offset, int length) throws 
HyracksDataException {
-        FrameUtils.appendToWriter(outputWriter, frameTupleAppender, bytes, 
offset, length);
-    }
-
-    public void append(IFrameTupleAccessor tupleAccessor, int tStartOffset, 
int tEndOffset)
-            throws HyracksDataException {
-        FrameUtils.appendToWriter(outputWriter, frameTupleAppender, 
tupleAccessor, tStartOffset, tEndOffset);
-    }
-
-    public void append(IFrameTupleAccessor tupleAccessor, int tIndex) throws 
HyracksDataException {
-        FrameUtils.appendToWriter(outputWriter, frameTupleAppender, 
tupleAccessor, tIndex);
-    }
-
     public void appendConcat(IFrameTupleAccessor accessor0, int tIndex0, 
IFrameTupleAccessor accessor1, int tIndex1)
             throws HyracksDataException {
         FrameUtils.appendConcatToWriter(outputWriter, frameTupleAppender, 
accessor0, tIndex0, accessor1, tIndex1);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
index 345c506..cae659d 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
@@ -18,32 +18,81 @@
  */
 package org.apache.hyracks.dataflow.common.io;
 
+import java.io.PrintStream;
 import java.nio.ByteBuffer;
 
 import org.apache.hyracks.api.comm.FrameHelper;
 import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.util.IntSerDeUtils;
 
+/**
+ * A frame tuple appender that appends messages stored in the task context 
when pushing frames forward
+ * This appender must only be used on network boundary
+ */
 public class MessagingFrameTupleAppender extends FrameTupleAppender {
 
-    public static final int MAX_MESSAGE_SIZE = 100;
     private final IHyracksTaskContext ctx;
+    private static final int NULL_MESSAGE_SIZE = 1;
+    public static final byte NULL_FEED_MESSAGE = 0x01;
+    public static final byte ACK_REQ_FEED_MESSAGE = 0x02;
+    public static final byte SNAPSHOT_MESSAGE = 0x03;
 
     public MessagingFrameTupleAppender(IHyracksTaskContext ctx) {
         this.ctx = ctx;
     }
 
+    public static void printMessage(VSizeFrame message, PrintStream out) 
throws HyracksDataException {
+        out.println(getMessageString(message));
+    }
+
+    public static String getMessageString(VSizeFrame message) throws 
HyracksDataException {
+        StringBuilder aString = new StringBuilder();
+        aString.append("Message Type: ");
+        switch (getMessageType(message)) {
+            case NULL_FEED_MESSAGE:
+                aString.append("Null, ");
+                break;
+            case ACK_REQ_FEED_MESSAGE:
+                aString.append("Ack Request, ");
+                break;
+            case SNAPSHOT_MESSAGE:
+                aString.append("Snapshot, ");
+                break;
+            default:
+                aString.append("Unknown, ");
+                break;
+        }
+        aString.append("Message Length: ");
+        int messageLength = message.getBuffer().remaining();
+        aString.append(messageLength);
+        return aString.toString();
+    }
+
+    public static byte getMessageType(VSizeFrame message) throws 
HyracksDataException {
+        switch (message.getBuffer().array()[0]) {
+            case NULL_FEED_MESSAGE:
+                return NULL_FEED_MESSAGE;
+            case ACK_REQ_FEED_MESSAGE:
+                return ACK_REQ_FEED_MESSAGE;
+            case SNAPSHOT_MESSAGE:
+                return SNAPSHOT_MESSAGE;
+            default:
+                throw new HyracksDataException("Unknown message type");
+        }
+    }
+
     @Override
     protected boolean canHoldNewTuple(int fieldCount, int dataLength) throws 
HyracksDataException {
-        if (hasEnoughSpace(fieldCount, dataLength + MAX_MESSAGE_SIZE)) {
+        if (hasEnoughSpace(fieldCount + 1, dataLength + NULL_MESSAGE_SIZE)) {
             return true;
         }
         if (tupleCount == 0) {
-            
frame.ensureFrameSize(FrameHelper.calcAlignedFrameSizeToStore(fieldCount, 
dataLength + MAX_MESSAGE_SIZE,
-                    frame.getMinSize()));
+            
frame.ensureFrameSize(FrameHelper.calcAlignedFrameSizeToStore(fieldCount + 1,
+                    dataLength + NULL_MESSAGE_SIZE, frame.getMinSize()));
             reset(frame.getBuffer(), true);
             return true;
         }
@@ -52,13 +101,32 @@
 
     @Override
     public void write(IFrameWriter outWriter, boolean clearFrame) throws 
HyracksDataException {
-        appendMessage((ByteBuffer) ctx.getSharedObject());
+        // If message fits, we append it, otherwise, we append a null message, 
then send a message only
+        // frame with the message
+        ByteBuffer message = ((VSizeFrame) ctx.getSharedObject()).getBuffer();
+        int messageSize = message.limit() - message.position();
+        if (hasEnoughSpace(1, messageSize)) {
+            appendMessage(message);
+            forward(outWriter);
+        } else {
+            if (tupleCount > 0) {
+                appendNullMessage();
+                forward(outWriter);
+            }
+            if (!hasEnoughSpace(1, messageSize)) {
+                
frame.ensureFrameSize(FrameHelper.calcAlignedFrameSizeToStore(1, messageSize, 
frame.getMinSize()));
+                reset(frame.getBuffer(), true);
+            }
+            appendMessage(message);
+            forward(outWriter);
+        }
+    }
+
+    private void forward(IFrameWriter outWriter) throws HyracksDataException {
         getBuffer().clear();
         outWriter.nextFrame(getBuffer());
-        if (clearFrame) {
-            frame.reset();
-            reset(getBuffer(), true);
-        }
+        frame.reset();
+        reset(getBuffer(), true);
     }
 
     private void appendMessage(ByteBuffer message) {
@@ -69,4 +137,13 @@
         ++tupleCount;
         IntSerDeUtils.putInt(getBuffer().array(), 
FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
     }
+
+    private void appendNullMessage() {
+        array[tupleDataEndOffset] = NULL_FEED_MESSAGE;
+        tupleDataEndOffset++;
+        IntSerDeUtils.putInt(getBuffer().array(),
+                FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * 
(tupleCount + 1), tupleDataEndOffset);
+        ++tupleCount;
+        IntSerDeUtils.putInt(getBuffer().array(), 
FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java
index e90d8b0..f6996f1 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java
@@ -30,6 +30,11 @@
 
     private static final long serialVersionUID = 1L;
 
+    /**
+     * This connector enable sending messages alongside data tuples. Messages 
are sent on flush() calls.
+     * It broadcasts messages to all consumers. If the message doesn't fit in 
the current frame for a specific
+     * receiver, the current frame is sent and a subsequent one with the 
message only is sent
+     */
     public 
MToNPartitioningWithMessageConnectorDescriptor(IConnectorDescriptorRegistry 
spec,
             ITuplePartitionComputerFactory tpcf) {
         super(spec, tpcf);
@@ -38,7 +43,7 @@
     @Override
     public IFrameWriter createPartitioner(IHyracksTaskContext ctx, 
RecordDescriptor recordDesc,
             IPartitionWriterFactory edwFactory, int index, int 
nProducerPartitions, int nConsumerPartitions)
-                    throws HyracksDataException {
+            throws HyracksDataException {
         return new PartitionWithMessageDataWriter(ctx, nConsumerPartitions, 
edwFactory, recordDesc,
                 tpcf.createPartitioner());
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
 
b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
index 98bd860..f2ec64b 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
@@ -44,6 +44,7 @@
     private WorkspaceFileFactory fileFactory;
 
     private Map<Object, IStateObject> stateObjectMap = new HashMap<>();
+    private Object sharedObject;
 
     public TestTaskContext(TestJobletContext jobletContext, TaskAttemptId 
taskId) {
         this.jobletContext = jobletContext;
@@ -149,10 +150,11 @@
 
     @Override
     public Object getSharedObject() {
-        return null;
+        return sharedObject;
     }
 
     @Override
     public void setSharedObject(Object sharedObject) {
+        this.sharedObject = sharedObject;
     }
 }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/880
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: If4336e9c234e8d282798cfba9f48432b46cccfca
Gerrit-PatchSet: 5
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <bamou...@gmail.com>
Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Murtadha Hubail <hubail...@gmail.com>
Gerrit-Reviewer: abdullah alamoudi <bamou...@gmail.com>

Reply via email to