abdullah alamoudi has submitted this change and it was merged. Change subject: Improve Messaging Connector and Add Unit Tests ......................................................................
Improve Messaging Connector and Add Unit Tests Before this change, messaging connector always reserves 100 bytes for messages which are mostly un-used. With this change, it only reserves two bytes and sends null messages by default. In case a new message doesn't fit in the leftover space of a frame, it sends the frame with a null message, followed by a dedicated frame for the message. Change-Id: If4336e9c234e8d282798cfba9f48432b46cccfca Reviewed-on: https://asterix-gerrit.ics.uci.edu/880 Reviewed-by: Murtadha Hubail <hubail...@gmail.com> Reviewed-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> --- M asterixdb/asterix-app/pom.xml A asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestTupleGenerator.java A asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java A asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPartitionComputerFactory.java A asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPartitionWriterFactory.java R asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestRecordDescriptorFactory.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java M asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java M hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java M hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestControlledFrameWriter.java M hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestFrameWriter.java M hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java M hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppenderWrapper.java M hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java M hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java 23 files changed, 796 insertions(+), 95 deletions(-) Approvals: Murtadha Hubail: Looks good to me, approved Jenkins: Looks good to me, but someone else must approve; Verified diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml index 99f1c2f..76d05d5 100644 --- a/asterixdb/asterix-app/pom.xml +++ b/asterixdb/asterix-app/pom.xml @@ -286,5 +286,10 @@ <artifactId>asterix-external-data</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.hyracks</groupId> + <artifactId>hyracks-test-support</artifactId> + <version>0.2.18-SNAPSHOT</version> + </dependency> </dependencies> </project> diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestTupleGenerator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestTupleGenerator.java new file mode 100644 index 0000000..e267cc7 --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestTupleGenerator.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.test.common; + +import java.util.Random; + +import org.apache.commons.lang.RandomStringUtils; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.util.GrowableArray; +import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; +import org.apache.hyracks.dataflow.common.data.marshalling.BooleanSerializerDeserializer; +import org.apache.hyracks.dataflow.common.data.marshalling.DoubleSerializerDeserializer; +import org.apache.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer; +import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer; +import org.apache.hyracks.util.string.UTF8StringReader; +import org.apache.hyracks.util.string.UTF8StringWriter; + +public class TestTupleGenerator { + private final int stringFieldSizes; + private final FieldType[] types; + private final boolean reuseObject; + private final Random random = new Random(); + private ITupleReference tuple; + private UTF8StringSerializerDeserializer stringSerde = + new UTF8StringSerializerDeserializer(new UTF8StringWriter(), new UTF8StringReader()); + private GrowableArray[] fields; + + public enum FieldType { + Integer64, + Boolean, + Double, + String + } + + public TestTupleGenerator(FieldType[] types, int stringFieldSizes, boolean resueObject) { + this.types = types; + this.stringFieldSizes = stringFieldSizes; + this.reuseObject = resueObject; + this.fields = new GrowableArray[types.length]; + for (int i = 0; i < types.length; i++) { + fields[i] = new GrowableArray(); + } + tuple = new TestTupleReference(fields); + } + + public ITupleReference next() throws HyracksDataException { + if (reuseObject) { + for (int i = 0; i < types.length; i++) { + fields[i].reset(); + } + } else { + this.fields = new GrowableArray[types.length]; + for (int i = 0; i < types.length; i++) { + fields[i] = new GrowableArray(); + } + tuple = new TestTupleReference(fields); + } + for (int i = 0; i < types.length; i++) { + FieldType type = types[i]; + switch (type) { + case Boolean: + Boolean aBoolean = random.nextBoolean(); + BooleanSerializerDeserializer.INSTANCE.serialize(aBoolean, fields[i].getDataOutput()); + break; + case Double: + double aDouble = random.nextDouble(); + DoubleSerializerDeserializer.INSTANCE.serialize(aDouble, fields[i].getDataOutput()); + break; + case Integer64: + long aLong = random.nextLong(); + Integer64SerializerDeserializer.INSTANCE.serialize(aLong, fields[i].getDataOutput()); + break; + case String: + String aString = RandomStringUtils.randomAlphanumeric(stringFieldSizes); + stringSerde.serialize(aString, fields[i].getDataOutput()); + break; + default: + break; + } + } + return tuple; + } + + private class TestTupleReference implements ITupleReference { + private final GrowableArray[] fields; + + private TestTupleReference(GrowableArray[] fields) { + this.fields = fields; + } + + @Override + public int getFieldCount() { + return fields.length; + } + + @Override + public byte[] getFieldData(int fIdx) { + + return fields[fIdx].getByteArray(); + } + + @Override + public int getFieldStart(int fIdx) { + return 0; + } + + @Override + public int getFieldLength(int fIdx) { + return fields[fIdx].getLength(); + } + } +} diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java new file mode 100644 index 0000000..2f712cc --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java @@ -0,0 +1,335 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.test.dataflow; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; + +import org.apache.asterix.external.util.FeedUtils; +import org.apache.asterix.test.common.TestTupleGenerator; +import org.apache.asterix.test.common.TestTupleGenerator.FieldType; +import org.apache.hyracks.api.comm.FrameHelper; +import org.apache.hyracks.api.comm.IFrameWriter; +import org.apache.hyracks.api.comm.VSizeFrame; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; +import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.IConnectorDescriptorRegistry; +import org.apache.hyracks.api.test.TestFrameWriter; +import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; +import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; +import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; +import org.apache.hyracks.dataflow.common.data.marshalling.BooleanSerializerDeserializer; +import org.apache.hyracks.dataflow.common.data.marshalling.DoubleSerializerDeserializer; +import org.apache.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer; +import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer; +import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender; +import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningWithMessageConnectorDescriptor; +import org.apache.hyracks.dataflow.std.connectors.PartitionWithMessageDataWriter; +import org.apache.hyracks.test.support.TestUtils; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +public class ConnectorDescriptorWithMessagingTest { + + // Tuples used in this test are made of Fields of {Integer,Double,Boolean,UTF8String}; + private static final int NUMBER_OF_CONSUMERS = 5; + private static final int DEFAULT_FRAME_SIZE = 32768; + private static final int CURRENT_PRODUCER = 0; + private static final int STRING_FIELD_SIZES = 32; + + @Test + public void testEmptyFrames() throws Exception { + try { + List<Integer> routing = Arrays.asList(0, 1, 2, 3, 4); + IConnectorDescriptorRegistry connDescRegistry = Mockito.mock(IConnectorDescriptorRegistry.class); + ITuplePartitionComputerFactory partitionComputerFactory = new TestPartitionComputerFactory(routing); + MToNPartitioningWithMessageConnectorDescriptor connector = + new MToNPartitioningWithMessageConnectorDescriptor(connDescRegistry, partitionComputerFactory); + IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE); + VSizeFrame message = new VSizeFrame(ctx); + VSizeFrame tempBuffer = new VSizeFrame(ctx); + ctx.setSharedObject(message); + message.getBuffer().clear(); + message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE); + message.getBuffer().flip(); + ISerializerDeserializer<?>[] serdes = new ISerializerDeserializer<?>[] { + Integer64SerializerDeserializer.INSTANCE, DoubleSerializerDeserializer.INSTANCE, + BooleanSerializerDeserializer.INSTANCE, new UTF8StringSerializerDeserializer() }; + RecordDescriptor rDesc = new RecordDescriptor(serdes); + TestPartitionWriterFactory partitionWriterFactory = new TestPartitionWriterFactory(); + IFrameWriter partitioner = connector.createPartitioner(ctx, rDesc, partitionWriterFactory, CURRENT_PRODUCER, + NUMBER_OF_CONSUMERS, NUMBER_OF_CONSUMERS); + partitioner.open(); + FrameTupleAccessor fta = new FrameTupleAccessor(rDesc); + List<TestFrameWriter> recipients = new ArrayList<>(); + for (IFrameWriter writer : partitionWriterFactory.getWriters().values()) { + recipients.add((TestFrameWriter) writer); + } + partitioner.flush(); + for (TestFrameWriter writer : recipients) { + Assert.assertEquals(writer.nextFrameCount(), 1); + fta.reset(writer.getLastFrame()); + Assert.assertEquals(fta.getTupleCount(), 1); + FeedUtils.processFeedMessage(writer.getLastFrame(), tempBuffer, fta); + Assert.assertEquals(MessagingFrameTupleAppender.NULL_FEED_MESSAGE, + MessagingFrameTupleAppender.getMessageType(tempBuffer)); + } + message.getBuffer().clear(); + message.getBuffer().put(MessagingFrameTupleAppender.ACK_REQ_FEED_MESSAGE); + message.getBuffer().flip(); + partitioner.flush(); + for (TestFrameWriter writer : recipients) { + Assert.assertEquals(writer.nextFrameCount(), 2); + fta.reset(writer.getLastFrame()); + Assert.assertEquals(fta.getTupleCount(), 1); + FeedUtils.processFeedMessage(writer.getLastFrame(), tempBuffer, fta); + Assert.assertEquals(MessagingFrameTupleAppender.ACK_REQ_FEED_MESSAGE, + MessagingFrameTupleAppender.getMessageType(tempBuffer)); + } + + message.getBuffer().clear(); + message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE); + message.getBuffer().flip(); + partitioner.flush(); + for (TestFrameWriter writer : recipients) { + Assert.assertEquals(writer.nextFrameCount(), 3); + fta.reset(writer.getLastFrame()); + Assert.assertEquals(fta.getTupleCount(), 1); + FeedUtils.processFeedMessage(writer.getLastFrame(), tempBuffer, fta); + Assert.assertEquals(MessagingFrameTupleAppender.NULL_FEED_MESSAGE, + MessagingFrameTupleAppender.getMessageType(tempBuffer)); + } + partitioner.close(); + for (TestFrameWriter writer : recipients) { + Assert.assertEquals(writer.nextFrameCount(), 4); + Assert.assertEquals(writer.closeCount(), 1); + } + + } catch (Throwable th) { + th.printStackTrace(); + throw th; + } + } + + @Test + public void testMessageLargerThanEmptyFrame() throws Exception { + try { + List<Integer> routing = Arrays.asList(0, 1, 2, 3, 4); + IConnectorDescriptorRegistry connDescRegistry = Mockito.mock(IConnectorDescriptorRegistry.class); + ITuplePartitionComputerFactory partitionComputerFactory = new TestPartitionComputerFactory(routing); + MToNPartitioningWithMessageConnectorDescriptor connector = + new MToNPartitioningWithMessageConnectorDescriptor(connDescRegistry, partitionComputerFactory); + IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE); + VSizeFrame message = new VSizeFrame(ctx); + VSizeFrame tempBuffer = new VSizeFrame(ctx); + ctx.setSharedObject(message); + writeRandomMessage(message, MessagingFrameTupleAppender.SNAPSHOT_MESSAGE, DEFAULT_FRAME_SIZE + 1); + ISerializerDeserializer<?>[] serdes = new ISerializerDeserializer<?>[] { + Integer64SerializerDeserializer.INSTANCE, DoubleSerializerDeserializer.INSTANCE, + BooleanSerializerDeserializer.INSTANCE, new UTF8StringSerializerDeserializer() }; + RecordDescriptor rDesc = new RecordDescriptor(serdes); + TestPartitionWriterFactory partitionWriterFactory = new TestPartitionWriterFactory(); + IFrameWriter partitioner = connector.createPartitioner(ctx, rDesc, partitionWriterFactory, CURRENT_PRODUCER, + NUMBER_OF_CONSUMERS, NUMBER_OF_CONSUMERS); + partitioner.open(); + FrameTupleAccessor fta = new FrameTupleAccessor(rDesc); + List<TestFrameWriter> recipients = new ArrayList<>(); + for (IFrameWriter writer : partitionWriterFactory.getWriters().values()) { + recipients.add((TestFrameWriter) writer); + } + partitioner.flush(); + for (TestFrameWriter writer : recipients) { + Assert.assertEquals(writer.nextFrameCount(), 1); + fta.reset(writer.getLastFrame()); + Assert.assertEquals(fta.getTupleCount(), 1); + FeedUtils.processFeedMessage(writer.getLastFrame(), tempBuffer, fta); + Assert.assertEquals(MessagingFrameTupleAppender.SNAPSHOT_MESSAGE, + MessagingFrameTupleAppender.getMessageType(tempBuffer)); + } + message.getBuffer().clear(); + message.getBuffer().put(MessagingFrameTupleAppender.ACK_REQ_FEED_MESSAGE); + message.getBuffer().flip(); + partitioner.flush(); + for (TestFrameWriter writer : recipients) { + Assert.assertEquals(writer.nextFrameCount(), 2); + fta.reset(writer.getLastFrame()); + Assert.assertEquals(fta.getTupleCount(), 1); + FeedUtils.processFeedMessage(writer.getLastFrame(), tempBuffer, fta); + Assert.assertEquals(MessagingFrameTupleAppender.ACK_REQ_FEED_MESSAGE, + MessagingFrameTupleAppender.getMessageType(tempBuffer)); + } + message.getBuffer().clear(); + message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE); + message.getBuffer().flip(); + partitioner.flush(); + for (TestFrameWriter writer : recipients) { + Assert.assertEquals(writer.nextFrameCount(), 3); + fta.reset(writer.getLastFrame()); + Assert.assertEquals(fta.getTupleCount(), 1); + FeedUtils.processFeedMessage(writer.getLastFrame(), tempBuffer, fta); + Assert.assertEquals(MessagingFrameTupleAppender.NULL_FEED_MESSAGE, + MessagingFrameTupleAppender.getMessageType(tempBuffer)); + } + partitioner.close(); + for (TestFrameWriter writer : recipients) { + Assert.assertEquals(writer.nextFrameCount(), 4); + Assert.assertEquals(writer.closeCount(), 1); + } + } catch (Throwable th) { + th.printStackTrace(); + throw th; + } + } + + private void writeRandomMessage(VSizeFrame frame, byte tag, int size) throws HyracksDataException { + // We subtract 2, 1 for the tag, and one for the end offset + Random random = new Random(); + byte[] bytes = new byte[size - 2]; + random.nextBytes(bytes); + int frameSize = FrameHelper.calcAlignedFrameSizeToStore(1, size - 1, DEFAULT_FRAME_SIZE); + frame.ensureFrameSize(frameSize); + frame.getBuffer().clear(); + frame.getBuffer().put(tag); + frame.getBuffer().put(bytes); + frame.getBuffer().flip(); + } + + @Test + public void testMessageLargerThanSome() throws Exception { + try { + // Routing will be to 1, 3, and 4 only. 0 and 2 will receive no tuples + List<Integer> routing = Arrays.asList(1, 3, 4); + IConnectorDescriptorRegistry connDescRegistry = Mockito.mock(IConnectorDescriptorRegistry.class); + ITuplePartitionComputerFactory partitionComputerFactory = new TestPartitionComputerFactory(routing); + MToNPartitioningWithMessageConnectorDescriptor connector = + new MToNPartitioningWithMessageConnectorDescriptor(connDescRegistry, partitionComputerFactory); + IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE); + VSizeFrame message = new VSizeFrame(ctx); + VSizeFrame tempBuffer = new VSizeFrame(ctx); + ctx.setSharedObject(message); + message.getBuffer().clear(); + writeRandomMessage(message, MessagingFrameTupleAppender.SNAPSHOT_MESSAGE, DEFAULT_FRAME_SIZE); + ISerializerDeserializer<?>[] serdes = new ISerializerDeserializer<?>[] { + Integer64SerializerDeserializer.INSTANCE, DoubleSerializerDeserializer.INSTANCE, + BooleanSerializerDeserializer.INSTANCE, new UTF8StringSerializerDeserializer() }; + FieldType[] types = { FieldType.Integer64, FieldType.Double, FieldType.Boolean, FieldType.String }; + RecordDescriptor rDesc = new RecordDescriptor(serdes); + TestPartitionWriterFactory partitionWriterFactory = new TestPartitionWriterFactory(); + PartitionWithMessageDataWriter partitioner = + (PartitionWithMessageDataWriter) connector.createPartitioner(ctx, rDesc, partitionWriterFactory, + CURRENT_PRODUCER, NUMBER_OF_CONSUMERS, NUMBER_OF_CONSUMERS); + partitioner.open(); + FrameTupleAccessor fta = new FrameTupleAccessor(rDesc); + List<TestFrameWriter> recipients = new ArrayList<>(); + for (int i = 0; i < partitionWriterFactory.getWriters().values().size(); i++) { + recipients.add(partitionWriterFactory.getWriters().get(i)); + } + TestTupleGenerator ttg = new TestTupleGenerator(types, STRING_FIELD_SIZES, true); + VSizeFrame frame = new VSizeFrame(ctx); + FrameTupleAppender appender = new FrameTupleAppender(frame); + ITupleReference tuple = ttg.next(); + while (appender.append(tuple)) { + tuple = ttg.next(); + } + partitioner.nextFrame(frame.getBuffer()); + partitioner.flush(); + Assert.assertEquals(partitionWriterFactory.getWriters().get(0).nextFrameCount(), 1); + Assert.assertEquals(partitionWriterFactory.getWriters().get(1).nextFrameCount(), 2); + Assert.assertEquals(partitionWriterFactory.getWriters().get(2).nextFrameCount(), 1); + Assert.assertEquals(partitionWriterFactory.getWriters().get(3).nextFrameCount(), 2); + Assert.assertEquals(partitionWriterFactory.getWriters().get(4).nextFrameCount(), 2); + for (TestFrameWriter writer : recipients) { + fta.reset(writer.getLastFrame()); + Assert.assertEquals(fta.getTupleCount(), 1); + FeedUtils.processFeedMessage(writer.getLastFrame(), tempBuffer, fta); + Assert.assertEquals(MessagingFrameTupleAppender.SNAPSHOT_MESSAGE, + MessagingFrameTupleAppender.getMessageType(tempBuffer)); + } + partitioner.close(); + } catch (Throwable th) { + th.printStackTrace(); + throw th; + } + } + + @Test + public void testMessageFitsWithTuples() throws Exception { + try { + // Routing will be round robin + List<Integer> routing = Arrays.asList(0, 1, 2, 3, 4); + IConnectorDescriptorRegistry connDescRegistry = Mockito.mock(IConnectorDescriptorRegistry.class); + ITuplePartitionComputerFactory partitionComputerFactory = new TestPartitionComputerFactory(routing); + MToNPartitioningWithMessageConnectorDescriptor connector = + new MToNPartitioningWithMessageConnectorDescriptor(connDescRegistry, partitionComputerFactory); + IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE); + VSizeFrame message = new VSizeFrame(ctx); + VSizeFrame tempBuffer = new VSizeFrame(ctx); + ctx.setSharedObject(message); + message.getBuffer().clear(); + message.getBuffer().put(MessagingFrameTupleAppender.ACK_REQ_FEED_MESSAGE); + message.getBuffer().flip(); + ISerializerDeserializer<?>[] serdes = new ISerializerDeserializer<?>[] { + Integer64SerializerDeserializer.INSTANCE, DoubleSerializerDeserializer.INSTANCE, + BooleanSerializerDeserializer.INSTANCE, new UTF8StringSerializerDeserializer() }; + FieldType[] types = { FieldType.Integer64, FieldType.Double, FieldType.Boolean, FieldType.String }; + RecordDescriptor rDesc = new RecordDescriptor(serdes); + TestPartitionWriterFactory partitionWriterFactory = new TestPartitionWriterFactory(); + PartitionWithMessageDataWriter partitioner = + (PartitionWithMessageDataWriter) connector.createPartitioner(ctx, rDesc, partitionWriterFactory, + CURRENT_PRODUCER, NUMBER_OF_CONSUMERS, NUMBER_OF_CONSUMERS); + partitioner.open(); + FrameTupleAccessor fta = new FrameTupleAccessor(rDesc); + List<TestFrameWriter> recipients = new ArrayList<>(); + for (int i = 0; i < partitionWriterFactory.getWriters().values().size(); i++) { + recipients.add(partitionWriterFactory.getWriters().get(i)); + } + TestTupleGenerator ttg = new TestTupleGenerator(types, STRING_FIELD_SIZES, true); + VSizeFrame frame = new VSizeFrame(ctx); + FrameTupleAppender appender = new FrameTupleAppender(frame); + for (int count = 0; count < NUMBER_OF_CONSUMERS; count++) { + ITupleReference tuple = ttg.next(); + appender.append(tuple); + } + partitioner.nextFrame(frame.getBuffer()); + partitioner.flush(); + Assert.assertEquals(partitionWriterFactory.getWriters().get(0).nextFrameCount(), 1); + Assert.assertEquals(partitionWriterFactory.getWriters().get(1).nextFrameCount(), 1); + Assert.assertEquals(partitionWriterFactory.getWriters().get(2).nextFrameCount(), 1); + Assert.assertEquals(partitionWriterFactory.getWriters().get(3).nextFrameCount(), 1); + Assert.assertEquals(partitionWriterFactory.getWriters().get(4).nextFrameCount(), 1); + for (TestFrameWriter writer : recipients) { + fta.reset(writer.getLastFrame()); + Assert.assertEquals(fta.getTupleCount(), 2); + FeedUtils.processFeedMessage(writer.getLastFrame(), tempBuffer, fta); + Assert.assertEquals(MessagingFrameTupleAppender.ACK_REQ_FEED_MESSAGE, + MessagingFrameTupleAppender.getMessageType(tempBuffer)); + } + partitioner.close(); + } catch (Throwable th) { + th.printStackTrace(); + throw th; + } + } +} diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPartitionComputerFactory.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPartitionComputerFactory.java new file mode 100644 index 0000000..385f6a2 --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPartitionComputerFactory.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.test.dataflow; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.hyracks.api.comm.IFrameTupleAccessor; +import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer; +import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class TestPartitionComputerFactory implements ITuplePartitionComputerFactory { + + private static final long serialVersionUID = 1L; + private final List<Integer> destinations; + + /* + * For test purposes, this partition computer produces partitions according to a passed in + * list of integers. + */ + public TestPartitionComputerFactory(List<Integer> destinations) { + this.destinations = destinations; + } + + @Override + public ITuplePartitionComputer createPartitioner() { + return new ITuplePartitionComputer() { + private final List<Integer> destinations = + new ArrayList<Integer>(TestPartitionComputerFactory.this.destinations); + private Iterator<Integer> iterator = destinations.iterator(); + + @Override + public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException { + if (destinations.size() == 0) { + return 0; + } + while (!iterator.hasNext()) { + iterator = destinations.iterator(); + } + return iterator.next(); + } + }; + } +} diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPartitionWriterFactory.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPartitionWriterFactory.java new file mode 100644 index 0000000..4b4c722 --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPartitionWriterFactory.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.test.dataflow; + +import java.util.Collections; +import java.util.HashMap; + +import org.apache.hyracks.api.comm.IFrameWriter; +import org.apache.hyracks.api.comm.IPartitionWriterFactory; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.test.FrameWriterTestUtils; +import org.apache.hyracks.api.test.TestFrameWriter; + +/* + * A partition writer factory that is used for testing partitioners + */ +public class TestPartitionWriterFactory implements IPartitionWriterFactory { + private HashMap<Integer, TestFrameWriter> writers = new HashMap<>(); + + @Override + public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException { + // The created writers must retain a deep copy of the input frame + writers.put(receiverIndex, FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(), true)); + return writers.get(receiverIndex); + } + + public HashMap<Integer, TestFrameWriter> getWriters() { + return writers; + } +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedMessageUtils.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestRecordDescriptorFactory.java similarity index 70% rename from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedMessageUtils.java rename to asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestRecordDescriptorFactory.java index 68783ca..536bf3a 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedMessageUtils.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestRecordDescriptorFactory.java @@ -16,14 +16,13 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.asterix.external.util; +package org.apache.asterix.test.dataflow; -public class FeedMessageUtils { - public enum MessageType { - NULL, - ACK_REQUEST +import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; + +public class TestRecordDescriptorFactory { + public RecordDescriptor createRecordDescriptor(ISerializerDeserializer<?>... serdes) { + return null; } - - public static final byte NULL_FEED_MESSAGE = (byte) MessageType.NULL.ordinal(); - public static final byte ACK_REQ_FEED_MESSAGE = (byte) MessageType.ACK_REQUEST.ordinal(); } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java index b09bef9..87daffa 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java @@ -67,18 +67,7 @@ continue; } tb.reset(); - try { - dataParser.parse(record, tb.getDataOutput()); - } catch (Exception e) { - e.printStackTrace(); - LOGGER.warn(ExternalDataConstants.ERROR_PARSE_RECORD, e); - feedLogManager.logRecord(record.toString(), ExternalDataConstants.ERROR_PARSE_RECORD); - continue; - } - tb.addFieldEndOffset(); - addMetaPart(tb, record); - addPrimaryKeys(tb, record); - tupleForwarder.addTuple(tb); + parseAndForward(record); } } catch (InterruptedException e) { //TODO: Find out what could cause an interrupted exception beside termination of a job/feed @@ -107,6 +96,23 @@ } } + private void parseAndForward(IRawRecord<? extends T> record) throws IOException { + synchronized (dataParser) { + try { + dataParser.parse(record, tb.getDataOutput()); + } catch (Exception e) { + LOGGER.warn(ExternalDataConstants.ERROR_PARSE_RECORD, e); + feedLogManager.logRecord(record.toString(), ExternalDataConstants.ERROR_PARSE_RECORD); + // continue the outer loop + return; + } + tb.addFieldEndOffset(); + addMetaPart(tb, record); + addPrimaryKeys(tb, record); + tupleForwarder.addTuple(tb); + } + } + protected void addMetaPart(ArrayTupleBuilder tb, IRawRecord<? extends T> record) throws IOException { } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java index ed5e355..0d72682 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java @@ -31,9 +31,8 @@ private final AsterixInputStream stream; public FeedStreamDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder, - FeedLogManager feedLogManager, int numOfFields, IStreamDataParser streamParser, - AsterixInputStream inputStream) { - super(ctx, tupleForwarder, feedLogManager, numOfFields); + FeedLogManager feedLogManager, IStreamDataParser streamParser, AsterixInputStream inputStream) { + super(ctx, tupleForwarder, feedLogManager, 1); this.dataParser = streamParser; this.stream = inputStream; } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java index 7ae2f41..f1eb870 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java @@ -19,14 +19,12 @@ package org.apache.asterix.external.dataflow; import java.io.IOException; -import java.nio.ByteBuffer; import javax.annotation.Nonnull; import org.apache.asterix.external.api.ITupleForwarder; import org.apache.asterix.external.util.DataflowUtils; import org.apache.asterix.external.util.FeedLogManager; -import org.apache.asterix.external.util.FeedMessageUtils; import org.apache.hyracks.api.comm.IFrame; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.comm.VSizeFrame; @@ -35,6 +33,7 @@ import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; +import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender; public class FeedTupleForwarder implements ITupleForwarder { @@ -60,10 +59,10 @@ this.writer = writer; this.appender = new FrameTupleAppender(frame); // Set null feed message - ByteBuffer message = (ByteBuffer) ctx.getSharedObject(); + VSizeFrame message = (VSizeFrame) ctx.getSharedObject(); // a null message - message.put(FeedMessageUtils.NULL_FEED_MESSAGE); - message.flip(); + message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE); + message.getBuffer().flip(); initialized = true; } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java index 5e8c022..6cdc2af 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java @@ -18,16 +18,15 @@ */ package org.apache.asterix.external.feed.runtime; -import java.nio.ByteBuffer; import java.util.logging.Level; import org.apache.asterix.external.feed.api.ISubscriberRuntime; import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter; import org.apache.asterix.external.feed.dataflow.FeedFrameCollector; import org.apache.asterix.external.feed.management.FeedId; +import org.apache.hyracks.api.comm.VSizeFrame; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender; public class IngestionRuntime extends SubscribableRuntime { @@ -48,7 +47,7 @@ dWriter.subscribe(collector); subscribers.add(collectionRuntime); if (numSubscribers == 0) { - ctx.setSharedObject(ByteBuffer.allocate(MessagingFrameTupleAppender.MAX_MESSAGE_SIZE)); + ctx.setSharedObject(new VSizeFrame(ctx)); collectionRuntime.getCtx().setSharedObject(ctx.getSharedObject()); adapterRuntimeManager.start(); } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java index 716468e..8d8bc28 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java @@ -34,13 +34,13 @@ import org.apache.asterix.external.feed.runtime.FeedRuntime; import org.apache.asterix.external.feed.runtime.FeedRuntimeId; import org.apache.asterix.external.util.FeedUtils; +import org.apache.hyracks.api.comm.VSizeFrame; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.IActivity; import org.apache.hyracks.api.dataflow.IOperatorDescriptor; import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; -import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender; import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable; /* @@ -86,7 +86,7 @@ private final FeedRuntimeType runtimeType = FeedRuntimeType.COMPUTE; - private ByteBuffer message = ByteBuffer.allocate(MessagingFrameTupleAppender.MAX_MESSAGE_SIZE); + private final VSizeFrame message; private final FeedMetaOperatorDescriptor opDesc; @@ -111,6 +111,7 @@ this.connectionId = feedConnectionId; this.feedManager = (FeedManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext() .getApplicationObject()).getFeedManager(); + this.message = new VSizeFrame(ctx); ctx.setSharedObject(message); this.opDesc = feedMetaOperatorDescriptor; this.recordDescProvider = recordDescProvider; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java index b79707b..47df39e 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java @@ -35,13 +35,13 @@ import org.apache.asterix.external.feed.runtime.FeedRuntimeId; import org.apache.asterix.external.util.FeedUtils; import org.apache.hyracks.api.comm.IFrameWriter; +import org.apache.hyracks.api.comm.VSizeFrame; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.IActivity; import org.apache.hyracks.api.dataflow.IOperatorDescriptor; import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; -import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender; import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable; public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable { @@ -87,7 +87,7 @@ private final String targetId; - private final ByteBuffer message = ByteBuffer.allocate(MessagingFrameTupleAppender.MAX_MESSAGE_SIZE); + private final VSizeFrame message; private final IRecordDescriptorProvider recordDescProvider; @@ -106,6 +106,7 @@ this.feedManager = (FeedManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext() .getApplicationObject()).getFeedManager(); this.targetId = targetId; + this.message = new VSizeFrame(ctx); ctx.setSharedObject(message); this.recordDescProvider = recordDescProvider; this.opDesc = feedMetaOperatorDescriptor; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java index 6ba27d8..50ebb71 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java @@ -48,7 +48,6 @@ import org.apache.asterix.external.util.DataflowUtils; import org.apache.asterix.external.util.ExternalDataUtils; import org.apache.asterix.external.util.FeedLogManager; -import org.apache.asterix.external.util.FeedUtils; import org.apache.asterix.om.types.ARecordType; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -73,8 +72,8 @@ DataflowUtils.getTupleForwarder(configuration, feedLogManager), dataParser, recordReader, ((IIndexingDatasource) recordReader).getIndexer()); } else if (isFeed) { - FeedTupleForwarder tupleForwarder = (FeedTupleForwarder) DataflowUtils - .getTupleForwarder(configuration, feedLogManager); + FeedTupleForwarder tupleForwarder = + (FeedTupleForwarder) DataflowUtils.getTupleForwarder(configuration, feedLogManager); boolean isChangeFeed = ExternalDataUtils.isChangeFeed(configuration); boolean isRecordWithMeta = ExternalDataUtils.isRecordWithMeta(configuration); if (isRecordWithMeta) { @@ -108,7 +107,7 @@ if (isFeed) { return new FeedStreamDataFlowController(ctx, (FeedTupleForwarder) DataflowUtils.getTupleForwarder(configuration, feedLogManager), - feedLogManager, FeedUtils.getNumOfFields(configuration), streamParser, stream); + feedLogManager, streamParser, stream); } else { return new StreamDataFlowController(ctx, DataflowUtils.getTupleForwarder(configuration, null), streamParser); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java index 51e7e72..8228c39 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java @@ -32,13 +32,14 @@ import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint.PartitionConstraintType; import org.apache.hyracks.api.comm.FrameHelper; +import org.apache.hyracks.api.comm.VSizeFrame; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.api.io.IIOManager; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; -import org.apache.hyracks.util.IntSerDeUtils; import org.apache.hyracks.dataflow.std.file.FileSplit; +import org.apache.hyracks.util.IntSerDeUtils; public class FeedUtils { private static String prepareDataverseFeedName(String dataverseName, String feedName) { @@ -49,8 +50,8 @@ ClusterPartition partition) { File relPathFile = new File(prepareDataverseFeedName(dataverseName, feedName)); String storageDirName = AsterixClusterProperties.INSTANCE.getStorageDirectoryName(); - String storagePartitionPath = StoragePathUtil.prepareStoragePartitionPath(storageDirName, - partition.getPartitionId()); + String storagePartitionPath = + StoragePathUtil.prepareStoragePartitionPath(storageDirName, partition.getPartitionId()); // Note: feed adapter instances in a single node share the feed logger // format: 'storage dir name'/partition_#/dataverse/feed/node File f = new File(storagePartitionPath + File.separator + relPathFile + File.separator + nodeName); @@ -88,20 +89,19 @@ feedLogFileSplit.getIODeviceId(), ctx.getIOManager()).getFile()); } - public static void processFeedMessage(ByteBuffer input, ByteBuffer message, FrameTupleAccessor fta) { + public static void processFeedMessage(ByteBuffer input, VSizeFrame message, FrameTupleAccessor fta) + throws HyracksDataException { // read the message and reduce the number of tuples fta.reset(input); int tc = fta.getTupleCount() - 1; int offset = fta.getTupleStartOffset(tc); int len = fta.getTupleLength(tc); - message.clear(); - message.put(input.array(), offset, len); - message.flip(); + int newSize = FrameHelper.calcAlignedFrameSizeToStore(1, len, message.getMinSize()); + message.ensureFrameSize(newSize); + message.getBuffer().clear(); + message.getBuffer().put(input.array(), offset, len); + message.getBuffer().flip(); IntSerDeUtils.putInt(input.array(), FrameHelper.getTupleCountOffset(input.capacity()), tc); - } - - public static int getNumOfFields(Map<String, String> configuration) { - return 1; } public static String getFeedMetaTypeName(Map<String, String> configuration) { diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java index c00db7a..3becd96 100644 --- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java +++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java @@ -118,7 +118,8 @@ FeedPolicyAccessor fpa = createFeedPolicyAccessor(true, false, NUM_FRAMES * DEFAULT_FRAME_SIZE, DISCARD_ALLOWANCE); // Non-Active Writer - TestFrameWriter writer = FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList()); + TestFrameWriter writer = + FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(), false); // FramePool ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, 0, DEFAULT_FRAME_SIZE); FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool); @@ -159,7 +160,8 @@ FeedPolicyAccessor fpa = createFeedPolicyAccessor(true, false, NUM_FRAMES * DEFAULT_FRAME_SIZE, DISCARD_ALLOWANCE); // Non-Active Writer - TestFrameWriter writer = FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList()); + TestFrameWriter writer = + FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(), false); // FramePool ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, 0, DEFAULT_FRAME_SIZE); FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool); @@ -207,7 +209,7 @@ FeedPolicyAccessor fpa = createFeedPolicyAccessor(true, true, DEFAULT_FRAME_SIZE * numberOfSpillFrames, DISCARD_ALLOWANCE); // Non-Active Writer - TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE); + TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false); writer.freeze(); // FramePool ConcurrentFramePool framePool = @@ -317,7 +319,7 @@ FeedPolicyAccessor fpa = createFeedPolicyAccessor(true, true, DEFAULT_FRAME_SIZE * numberOfSpillFrames, DISCARD_ALLOWANCE); // Non-Active Writer - TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE); + TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false); writer.freeze(); // FramePool ConcurrentFramePool framePool = @@ -390,7 +392,7 @@ // Spill budget = Memory budget, No discard FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, true, DEFAULT_FRAME_SIZE, DISCARD_ALLOWANCE); // Non-Active Writer - TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE); + TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false); writer.freeze(); // FramePool ConcurrentFramePool framePool = @@ -452,7 +454,7 @@ // Spill budget = Memory budget, No discard FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, true, DEFAULT_FRAME_SIZE, DISCARD_ALLOWANCE); // Non-Active Writer - TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE); + TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false); writer.freeze(); // FramePool ConcurrentFramePool framePool = @@ -509,7 +511,7 @@ FeedPolicyAccessor fpa = createFeedPolicyAccessor(true, false, DEFAULT_FRAME_SIZE * NUM_FRAMES, DISCARD_ALLOWANCE); // Non-Active Writer - TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE); + TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false); writer.freeze(); // FramePool ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE); @@ -553,7 +555,8 @@ // No spill, No discard FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, false, 0L, DISCARD_ALLOWANCE); // Non-Active Writer - TestFrameWriter writer = FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList()); + TestFrameWriter writer = + FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(), false); // FramePool ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE); FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool); @@ -593,7 +596,8 @@ // No spill, No discard FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, false, 0L, DISCARD_ALLOWANCE); // Non-Active Writer - TestFrameWriter writer = FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList()); + TestFrameWriter writer = + FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(), false); // FramePool ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE); FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool); @@ -632,7 +636,7 @@ // No spill, No discard FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, false, 0L, DISCARD_ALLOWANCE); // Non-Active Writer - TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE); + TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false); writer.freeze(); // FramePool ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE); @@ -682,7 +686,7 @@ FeedPolicyAccessor fpa = createFeedPolicyAccessor(true, false, DEFAULT_FRAME_SIZE * NUM_FRAMES, DISCARD_ALLOWANCE); // Non-Active Writer - TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE); + TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false); writer.freeze(); // FramePool ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE); @@ -732,7 +736,7 @@ // No spill, No discard FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, false, 0L, DISCARD_ALLOWANCE); // Non-Active Writer - TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE); + TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false); writer.freeze(); // FramePool ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE); diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java index 4bddfa9..c9cc71e 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java @@ -33,7 +33,7 @@ } public static TestFrameWriter create(Collection<FrameWriterOperation> exceptionThrowingOperations, - Collection<FrameWriterOperation> errorThrowingOperations) { + Collection<FrameWriterOperation> errorThrowingOperations, boolean deepCopyInputFrames) { CountAnswer openAnswer = createAnswer(FrameWriterOperation.Open, exceptionThrowingOperations, errorThrowingOperations); CountAnswer nextAnswer = @@ -44,7 +44,7 @@ createAnswer(FrameWriterOperation.Fail, exceptionThrowingOperations, errorThrowingOperations); CountAnswer closeAnswer = createAnswer(FrameWriterOperation.Close, exceptionThrowingOperations, errorThrowingOperations); - return new TestFrameWriter(openAnswer, nextAnswer, flushAnswer, failAnswer, closeAnswer); + return new TestFrameWriter(openAnswer, nextAnswer, flushAnswer, failAnswer, closeAnswer, deepCopyInputFrames); } public static CountAnswer createAnswer(FrameWriterOperation operation, @@ -59,7 +59,7 @@ } } - public static TestControlledFrameWriter create(int initialFrameSize) { - return new TestControlledFrameWriter(initialFrameSize); + public static TestControlledFrameWriter create(int initialFrameSize, boolean deepCopyInputFrames) { + return new TestControlledFrameWriter(initialFrameSize, deepCopyInputFrames); } } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestControlledFrameWriter.java b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestControlledFrameWriter.java index bf168c2..b98bcf7 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestControlledFrameWriter.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestControlledFrameWriter.java @@ -30,8 +30,9 @@ private volatile int currentMultiplier = 0; private volatile int kicks = 0; - public TestControlledFrameWriter(int initialFrameSize) { - super(new CountAnswer(), new CountAnswer(), new CountAnswer(), new CountAnswer(), new CountAnswer()); + public TestControlledFrameWriter(int initialFrameSize, boolean deepCopyInputFrames) { + super(new CountAnswer(), new CountAnswer(), new CountAnswer(), new CountAnswer(), new CountAnswer(), + deepCopyInputFrames); this.initialFrameSize = initialFrameSize; } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestFrameWriter.java b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestFrameWriter.java index b3492fe..065e64d 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestFrameWriter.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/TestFrameWriter.java @@ -29,19 +29,27 @@ private final CountAnswer flushAnswer; private final CountAnswer failAnswer; private final CountAnswer closeAnswer; + private static final int BYTES32KB = 32768; private long openDuration = 0L; private long nextDuration = 0L; private long flushDuration = 0L; private long failDuration = 0L; private long closeDuration = 0L; + // If copyFrames was set, we take a copy of the frame, otherwise, we simply point lastFrame to it + private final boolean deepCopyFrames; + private ByteBuffer lastFrame; public TestFrameWriter(CountAnswer openAnswer, CountAnswer nextAnswer, CountAnswer flushAnswer, - CountAnswer failAnswer, CountAnswer closeAnswer) { + CountAnswer failAnswer, CountAnswer closeAnswer, boolean deepCopyFrames) { this.openAnswer = openAnswer; this.nextAnswer = nextAnswer; this.closeAnswer = closeAnswer; this.flushAnswer = flushAnswer; this.failAnswer = failAnswer; + this.deepCopyFrames = deepCopyFrames; + if (deepCopyFrames) { + lastFrame = ByteBuffer.allocate(BYTES32KB); + } } @Override @@ -56,6 +64,15 @@ @Override public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + if (deepCopyFrames) { + if (lastFrame.capacity() != buffer.capacity()) { + lastFrame = ByteBuffer.allocate(buffer.capacity()); + } + lastFrame.clear(); + lastFrame.put(buffer.array()); + } else { + lastFrame = buffer; + } delay(nextDuration); nextAnswer.call(); } @@ -170,4 +187,8 @@ } } } + + public ByteBuffer getLastFrame() { + return lastFrame; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java index 1623035..ef11b5b 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java @@ -25,6 +25,7 @@ import org.apache.hyracks.api.comm.IFrameTupleAccessor; import org.apache.hyracks.api.comm.IFrameTupleAppender; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; import org.apache.hyracks.util.IntSerDeUtils; public class FrameTupleAppender extends AbstractFrameAppender implements IFrameTupleAppender { @@ -51,6 +52,30 @@ } System.arraycopy(bytes, offset, array, tupleDataEndOffset + fieldSlots.length * 4, length); tupleDataEndOffset += fieldSlots.length * 4 + length; + IntSerDeUtils.putInt(getBuffer().array(), + FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset); + ++tupleCount; + IntSerDeUtils.putInt(getBuffer().array(), FrameHelper.getTupleCountOffset(frame.getFrameSize()), + tupleCount); + return true; + } + return false; + } + + public boolean append(ITupleReference tuple) throws HyracksDataException { + int tupleSize = 0; + for (int i = 0; i < tuple.getFieldCount(); i++) { + tupleSize += tuple.getFieldLength(i); + } + if (canHoldNewTuple(tuple.getFieldCount(), tupleSize)) { + int offset = 0; + for (int i = 0; i < tuple.getFieldCount(); ++i) { + IntSerDeUtils.putInt(array, tupleDataEndOffset + i * 4, offset); + System.arraycopy(tuple.getFieldData(i), tuple.getFieldStart(i), array, + tupleDataEndOffset + tuple.getFieldCount() * 4, tuple.getFieldLength(i)); + offset += tuple.getFieldLength(i); + } + tupleDataEndOffset += tuple.getFieldCount() * 4 + tupleSize; IntSerDeUtils.putInt(getBuffer().array(), FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset); ++tupleCount; @@ -252,8 +277,8 @@ int fEndOffset = 0; for (int i = 0; i < fields.length; ++i) { int fSrcStart = tStartOffset + fSrcSlotsLength + accessor.getFieldStartOffset(tIndex, fields[i]); - int fLen = accessor.getFieldEndOffset(tIndex, fields[i]) - - accessor.getFieldStartOffset(tIndex, fields[i]); + int fLen = + accessor.getFieldEndOffset(tIndex, fields[i]) - accessor.getFieldStartOffset(tIndex, fields[i]); System.arraycopy(accessor.getBuffer().array(), fSrcStart, array, tupleDataEndOffset + fTargetSlotsLength + fStartOffset, fLen); fEndOffset += fLen; diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppenderWrapper.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppenderWrapper.java index 77495dd..e57e12d 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppenderWrapper.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppenderWrapper.java @@ -71,19 +71,6 @@ FrameUtils.appendSkipEmptyFieldToWriter(outputWriter, frameTupleAppender, fieldSlots, bytes, offset, length); } - public void append(byte[] bytes, int offset, int length) throws HyracksDataException { - FrameUtils.appendToWriter(outputWriter, frameTupleAppender, bytes, offset, length); - } - - public void append(IFrameTupleAccessor tupleAccessor, int tStartOffset, int tEndOffset) - throws HyracksDataException { - FrameUtils.appendToWriter(outputWriter, frameTupleAppender, tupleAccessor, tStartOffset, tEndOffset); - } - - public void append(IFrameTupleAccessor tupleAccessor, int tIndex) throws HyracksDataException { - FrameUtils.appendToWriter(outputWriter, frameTupleAppender, tupleAccessor, tIndex); - } - public void appendConcat(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1, int tIndex1) throws HyracksDataException { FrameUtils.appendConcatToWriter(outputWriter, frameTupleAppender, accessor0, tIndex0, accessor1, tIndex1); diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java index 345c506..cae659d 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java @@ -18,32 +18,81 @@ */ package org.apache.hyracks.dataflow.common.io; +import java.io.PrintStream; import java.nio.ByteBuffer; import org.apache.hyracks.api.comm.FrameHelper; import org.apache.hyracks.api.comm.IFrameWriter; +import org.apache.hyracks.api.comm.VSizeFrame; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; import org.apache.hyracks.util.IntSerDeUtils; +/** + * A frame tuple appender that appends messages stored in the task context when pushing frames forward + * This appender must only be used on network boundary + */ public class MessagingFrameTupleAppender extends FrameTupleAppender { - public static final int MAX_MESSAGE_SIZE = 100; private final IHyracksTaskContext ctx; + private static final int NULL_MESSAGE_SIZE = 1; + public static final byte NULL_FEED_MESSAGE = 0x01; + public static final byte ACK_REQ_FEED_MESSAGE = 0x02; + public static final byte SNAPSHOT_MESSAGE = 0x03; public MessagingFrameTupleAppender(IHyracksTaskContext ctx) { this.ctx = ctx; } + public static void printMessage(VSizeFrame message, PrintStream out) throws HyracksDataException { + out.println(getMessageString(message)); + } + + public static String getMessageString(VSizeFrame message) throws HyracksDataException { + StringBuilder aString = new StringBuilder(); + aString.append("Message Type: "); + switch (getMessageType(message)) { + case NULL_FEED_MESSAGE: + aString.append("Null, "); + break; + case ACK_REQ_FEED_MESSAGE: + aString.append("Ack Request, "); + break; + case SNAPSHOT_MESSAGE: + aString.append("Snapshot, "); + break; + default: + aString.append("Unknown, "); + break; + } + aString.append("Message Length: "); + int messageLength = message.getBuffer().remaining(); + aString.append(messageLength); + return aString.toString(); + } + + public static byte getMessageType(VSizeFrame message) throws HyracksDataException { + switch (message.getBuffer().array()[0]) { + case NULL_FEED_MESSAGE: + return NULL_FEED_MESSAGE; + case ACK_REQ_FEED_MESSAGE: + return ACK_REQ_FEED_MESSAGE; + case SNAPSHOT_MESSAGE: + return SNAPSHOT_MESSAGE; + default: + throw new HyracksDataException("Unknown message type"); + } + } + @Override protected boolean canHoldNewTuple(int fieldCount, int dataLength) throws HyracksDataException { - if (hasEnoughSpace(fieldCount, dataLength + MAX_MESSAGE_SIZE)) { + if (hasEnoughSpace(fieldCount + 1, dataLength + NULL_MESSAGE_SIZE)) { return true; } if (tupleCount == 0) { - frame.ensureFrameSize(FrameHelper.calcAlignedFrameSizeToStore(fieldCount, dataLength + MAX_MESSAGE_SIZE, - frame.getMinSize())); + frame.ensureFrameSize(FrameHelper.calcAlignedFrameSizeToStore(fieldCount + 1, + dataLength + NULL_MESSAGE_SIZE, frame.getMinSize())); reset(frame.getBuffer(), true); return true; } @@ -52,13 +101,32 @@ @Override public void write(IFrameWriter outWriter, boolean clearFrame) throws HyracksDataException { - appendMessage((ByteBuffer) ctx.getSharedObject()); + // If message fits, we append it, otherwise, we append a null message, then send a message only + // frame with the message + ByteBuffer message = ((VSizeFrame) ctx.getSharedObject()).getBuffer(); + int messageSize = message.limit() - message.position(); + if (hasEnoughSpace(1, messageSize)) { + appendMessage(message); + forward(outWriter); + } else { + if (tupleCount > 0) { + appendNullMessage(); + forward(outWriter); + } + if (!hasEnoughSpace(1, messageSize)) { + frame.ensureFrameSize(FrameHelper.calcAlignedFrameSizeToStore(1, messageSize, frame.getMinSize())); + reset(frame.getBuffer(), true); + } + appendMessage(message); + forward(outWriter); + } + } + + private void forward(IFrameWriter outWriter) throws HyracksDataException { getBuffer().clear(); outWriter.nextFrame(getBuffer()); - if (clearFrame) { - frame.reset(); - reset(getBuffer(), true); - } + frame.reset(); + reset(getBuffer(), true); } private void appendMessage(ByteBuffer message) { @@ -69,4 +137,13 @@ ++tupleCount; IntSerDeUtils.putInt(getBuffer().array(), FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount); } + + private void appendNullMessage() { + array[tupleDataEndOffset] = NULL_FEED_MESSAGE; + tupleDataEndOffset++; + IntSerDeUtils.putInt(getBuffer().array(), + FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset); + ++tupleCount; + IntSerDeUtils.putInt(getBuffer().array(), FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount); + } } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java index e90d8b0..f6996f1 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningWithMessageConnectorDescriptor.java @@ -30,6 +30,11 @@ private static final long serialVersionUID = 1L; + /** + * This connector enable sending messages alongside data tuples. Messages are sent on flush() calls. + * It broadcasts messages to all consumers. If the message doesn't fit in the current frame for a specific + * receiver, the current frame is sent and a subsequent one with the message only is sent + */ public MToNPartitioningWithMessageConnectorDescriptor(IConnectorDescriptorRegistry spec, ITuplePartitionComputerFactory tpcf) { super(spec, tpcf); @@ -38,7 +43,7 @@ @Override public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc, IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions) - throws HyracksDataException { + throws HyracksDataException { return new PartitionWithMessageDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc, tpcf.createPartitioner()); } diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java index 98bd860..f2ec64b 100644 --- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java +++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java @@ -44,6 +44,7 @@ private WorkspaceFileFactory fileFactory; private Map<Object, IStateObject> stateObjectMap = new HashMap<>(); + private Object sharedObject; public TestTaskContext(TestJobletContext jobletContext, TaskAttemptId taskId) { this.jobletContext = jobletContext; @@ -149,10 +150,11 @@ @Override public Object getSharedObject() { - return null; + return sharedObject; } @Override public void setSharedObject(Object sharedObject) { + this.sharedObject = sharedObject; } } -- To view, visit https://asterix-gerrit.ics.uci.edu/880 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: If4336e9c234e8d282798cfba9f48432b46cccfca Gerrit-PatchSet: 5 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <bamou...@gmail.com> Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Gerrit-Reviewer: Murtadha Hubail <hubail...@gmail.com> Gerrit-Reviewer: abdullah alamoudi <bamou...@gmail.com>