>From Wail Alkowaileet <[email protected]>:

Wail Alkowaileet has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18175 )


Change subject: [WIP] Prep external database writer
......................................................................

[WIP] Prep external database writer

Change-Id: I81a86870befbc75720b2e2e585c9741463c707c6
---
R 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriter.java
A 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/IWriterPartitioner.java
A 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/KeyWriterPartitioner.java
R 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriterFactory.java
A 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/NoOpWriterPartitioner.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/DynamicPathResolver.java
M 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntime.java
A 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalDatabaseWriter.java
A 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/WriterPartitioner.java
M 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntimeFactory.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
11 files changed, 243 insertions(+), 33 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/75/18175/1

diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 8d8ca80..cea0689 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -106,7 +106,7 @@
 import 
org.apache.asterix.runtime.operators.LSMSecondaryInsertDeleteWithNestedPlanOperatorDescriptor;
 import 
org.apache.asterix.runtime.operators.LSMSecondaryUpsertOperatorDescriptor;
 import 
org.apache.asterix.runtime.operators.LSMSecondaryUpsertWithNestedPlanOperatorDescriptor;
-import org.apache.asterix.runtime.writer.ExternalWriterFactory;
+import org.apache.asterix.runtime.writer.ExternalFileWriterFactory;
 import org.apache.asterix.runtime.writer.IExternalFilePrinterFactory;
 import org.apache.asterix.runtime.writer.IExternalFileWriterFactory;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
@@ -762,7 +762,7 @@
         String fileExtension = ExternalWriterProvider.getFileExtension(sink);
         int maxResult = ExternalWriterProvider.getMaxResult(sink);
         IExternalFilePrinterFactory printerFactory = 
ExternalWriterProvider.createPrinter(sink, sourceType);
-        ExternalWriterFactory writerFactory = new 
ExternalWriterFactory(fileWriterFactory, printerFactory,
+        ExternalFileWriterFactory writerFactory = new 
ExternalFileWriterFactory(fileWriterFactory, printerFactory,
                 fileExtension, maxResult, dynamicPathEvalFactory, staticPath, 
pathSourceLocation);
         SinkExternalWriterRuntimeFactory runtime = new 
SinkExternalWriterRuntimeFactory(sourceColumn, partitionColumns,
                 partitionComparatorFactories, inputDesc, writerFactory);
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/DynamicPathResolver.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/DynamicPathResolver.java
index 7105efa..c6d1dbe 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/DynamicPathResolver.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/DynamicPathResolver.java
@@ -54,7 +54,7 @@
     @Override
     public String getPartitionDirectory(IFrameTupleReference tuple) throws 
HyracksDataException {
         if (!appendPrefix(tuple)) {
-            return ExternalWriter.UNRESOLVABLE_PATH;
+            return ExternalFileWriter.UNRESOLVABLE_PATH;
         }

         if (dirStringBuilder.length() > 0 && 
dirStringBuilder.charAt(dirStringBuilder.length() - 1) != fileSeparator) {
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalDatabaseWriter.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalDatabaseWriter.java
new file mode 100644
index 0000000..676d6af
--- /dev/null
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalDatabaseWriter.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.writer;
+
+import org.apache.hyracks.algebricks.runtime.writers.IExternalWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class ExternalDatabaseWriter implements IExternalWriter {
+    @Override
+    public void open() throws HyracksDataException {
+
+    }
+
+    @Override
+    public void initNewPartition(IFrameTupleReference tuple) throws 
HyracksDataException {
+        // Set the key here
+    }
+
+    @Override
+    public void write(IValueReference value) throws HyracksDataException {
+
+    }
+
+    @Override
+    public void abort() throws HyracksDataException {
+
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+
+    }
+}
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriter.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriter.java
similarity index 94%
rename from 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriter.java
rename to 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriter.java
index 5fc07af..f9f98da 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriter.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriter.java
@@ -23,7 +23,7 @@
 import org.apache.hyracks.data.std.api.IValueReference;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
-final class ExternalWriter implements IExternalWriter {
+final class ExternalFileWriter implements IExternalWriter {
     static final String UNRESOLVABLE_PATH = "UNRESOLVABLE_PATH";
     private final IPathResolver pathResolver;
     private final IExternalFileWriter writer;
@@ -31,7 +31,7 @@
     private String partitionPath;
     private int tupleCounter;

-    public ExternalWriter(IPathResolver pathResolver, IExternalFileWriter 
writer, int maxResultPerFile) {
+    public ExternalFileWriter(IPathResolver pathResolver, IExternalFileWriter 
writer, int maxResultPerFile) {
         this.pathResolver = pathResolver;
         this.writer = writer;
         this.maxResultPerFile = maxResultPerFile;
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriterFactory.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriterFactory.java
similarity index 88%
rename from 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriterFactory.java
rename to 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriterFactory.java
index e7c0db0..d5f01e8 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriterFactory.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriterFactory.java
@@ -28,7 +28,7 @@
 import org.apache.hyracks.api.exceptions.IWarningCollector;
 import org.apache.hyracks.api.exceptions.SourceLocation;

-public class ExternalWriterFactory implements IExternalWriterFactory {
+public class ExternalFileWriterFactory implements IExternalWriterFactory {
     private static final long serialVersionUID = 1412969574113419638L;
     private final IExternalFileWriterFactory writerFactory;
     private final IExternalFilePrinterFactory printerFactory;
@@ -38,9 +38,9 @@
     private final String staticPath;
     private final SourceLocation pathSourceLocation;

-    public ExternalWriterFactory(IExternalFileWriterFactory writerFactory, 
IExternalFilePrinterFactory printerFactory,
-            String fileExtension, int maxResult, IScalarEvaluatorFactory 
pathEvalFactory, String staticPath,
-            SourceLocation pathSourceLocation) {
+    public ExternalFileWriterFactory(IExternalFileWriterFactory writerFactory,
+            IExternalFilePrinterFactory printerFactory, String fileExtension, 
int maxResult,
+            IScalarEvaluatorFactory pathEvalFactory, String staticPath, 
SourceLocation pathSourceLocation) {
         this.writerFactory = writerFactory;
         this.printerFactory = printerFactory;
         this.fileExtension = fileExtension;
@@ -65,6 +65,6 @@
             resolver = new StaticPathResolver(fileExtension, fileSeparator, 
partition, staticPath);
         }
         IExternalFileWriter writer = writerFactory.createWriter(context, 
printerFactory);
-        return new ExternalWriter(resolver, writer, maxResult);
+        return new ExternalFileWriter(resolver, writer, maxResult);
     }
 }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/IWriterPartitioner.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/IWriterPartitioner.java
new file mode 100644
index 0000000..39b3647
--- /dev/null
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/IWriterPartitioner.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.runtime.operators.writer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+interface IWriterPartitioner {
+    boolean isNewPartition(FrameTupleAccessor tupleAccessor, int index) throws 
HyracksDataException;
+
+}
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/KeyWriterPartitioner.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/KeyWriterPartitioner.java
new file mode 100644
index 0000000..3cb44ff
--- /dev/null
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/KeyWriterPartitioner.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.runtime.operators.writer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+class KeyWriterPartitioner implements IWriterPartitioner {
+    public static final IWriterPartitioner INSTANCE = new 
KeyWriterPartitioner();
+
+    private KeyWriterPartitioner() {
+    }
+
+    @Override
+    public boolean isNewPartition(FrameTupleAccessor tupleAccessor, int index) 
throws HyracksDataException {
+        // Every key is a partition
+        return true;
+    }
+}
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/NoOpWriterPartitioner.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/NoOpWriterPartitioner.java
new file mode 100644
index 0000000..967e23c
--- /dev/null
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/NoOpWriterPartitioner.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.runtime.operators.writer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+class NoOpWriterPartitioner implements IWriterPartitioner {
+    public static final IWriterPartitioner INSTANCE = new 
NoOpWriterPartitioner();
+
+    private NoOpWriterPartitioner() {
+    }
+
+    @Override
+    public boolean isNewPartition(FrameTupleAccessor tupleAccessor, int index) 
throws HyracksDataException {
+        return false;
+    }
+}
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntime.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntime.java
index 01e137b..97f626b 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntime.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntime.java
@@ -23,41 +23,29 @@
 import 
org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputSinkPushRuntime;
 import org.apache.hyracks.algebricks.runtime.writers.IExternalWriter;
 import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.api.IPointable;
 import org.apache.hyracks.data.std.primitive.VoidPointable;
-import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-import 
org.apache.hyracks.dataflow.common.data.accessors.PermutingFrameTupleReference;
-import 
org.apache.hyracks.dataflow.common.data.accessors.PointableTupleReference;
-import 
org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupWriter;

 final class SinkExternalWriterRuntime extends AbstractOneInputSinkPushRuntime {
     private final int sourceColumn;
-    private final int[] partitionColumns;
+    private final IWriterPartitioner partitioner;
     private final IPointable sourceValue;
-    private final PointableTupleReference partitionColumnsPrevCopy;
-    private final PermutingFrameTupleReference partitionColumnsRef;
-    private final IBinaryComparator[] partitionComparators;
     private final IExternalWriter writer;
     private FrameTupleAccessor tupleAccessor;
     private FrameTupleReference tupleRef;
     private boolean first;
     private IFrameWriter frameWriter;

-    SinkExternalWriterRuntime(int sourceColumn, int[] partitionColumns, 
IBinaryComparator[] partitionComparators,
-            RecordDescriptor inputRecordDesc, IExternalWriter writer) {
+    SinkExternalWriterRuntime(int sourceColumn, IWriterPartitioner 
partitioner, RecordDescriptor inputRecordDesc,
+            IExternalWriter writer) {
         this.sourceColumn = sourceColumn;
-        this.partitionColumns = partitionColumns;
+        this.partitioner = partitioner;
         this.sourceValue = new VoidPointable();
-        partitionColumnsRef = new 
PermutingFrameTupleReference(partitionColumns);
-        partitionColumnsPrevCopy =
-                PointableTupleReference.create(partitionColumns.length, 
ArrayBackedValueStorage::new);
-        this.partitionComparators = partitionComparators;
         this.inputRecordDesc = inputRecordDesc;
         this.writer = writer;
         first = true;
@@ -83,8 +71,6 @@
             }
             setValue(tupleRef, sourceColumn, sourceValue);
             writer.write(sourceValue);
-            partitionColumnsRef.reset(tupleAccessor, i);
-            partitionColumnsPrevCopy.set(partitionColumnsRef);
         }
     }

@@ -111,8 +97,7 @@
             return true;
         }

-        return !PreclusteredGroupWriter.sameGroup(partitionColumnsPrevCopy, 
tupleAccessor, index, partitionColumns,
-                partitionComparators);
+        return partitioner.isNewPartition(tupleAccessor, index);
     }

     private void setValue(IFrameTupleReference tuple, int column, IPointable 
value) {
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntimeFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntimeFactory.java
index 6220dec..023f671 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntimeFactory.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntimeFactory.java
@@ -49,12 +49,27 @@
     @Override
     public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws 
HyracksDataException {
         IExternalWriter writer = writerFactory.createWriter(ctx);
+        IWriterPartitioner partitioner = createPartitioner();
+        SinkExternalWriterRuntime runtime =
+                new SinkExternalWriterRuntime(sourceColumn, partitioner, 
inputRecordDescriptor, writer);
+        return new IPushRuntime[] { runtime };
+    }
+
+    private IWriterPartitioner createPartitioner() {
+        if (partitionColumn.length == 0) {
+            return NoOpWriterPartitioner.INSTANCE;
+        }
+        /*
+         * else if(keyColumns.length > 0) {
+         *    return new KeyWriterPartitioner()
+         * }
+         */
+
         IBinaryComparator[] partitionComparators = new 
IBinaryComparator[partitionComparatorFactories.length];
         for (int i = 0; i < partitionComparatorFactories.length; i++) {
             partitionComparators[i] = 
partitionComparatorFactories[i].createBinaryComparator();
         }
-        SinkExternalWriterRuntime runtime = new 
SinkExternalWriterRuntime(sourceColumn, partitionColumn,
-                partitionComparators, inputRecordDescriptor, writer);
-        return new IPushRuntime[] { runtime };
+
+        return new WriterPartitioner(partitionColumn, partitionComparators);
     }
 }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/WriterPartitioner.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/WriterPartitioner.java
new file mode 100644
index 0000000..991581c
--- /dev/null
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/WriterPartitioner.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.runtime.operators.writer;
+
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import 
org.apache.hyracks.dataflow.common.data.accessors.PermutingFrameTupleReference;
+import 
org.apache.hyracks.dataflow.common.data.accessors.PointableTupleReference;
+import 
org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupWriter;
+
+class WriterPartitioner implements IWriterPartitioner {
+    private final int[] partitionColumns;
+    private final IBinaryComparator[] partitionComparators;
+    private final PointableTupleReference partitionColumnsPrevCopy;
+    private final PermutingFrameTupleReference partitionColumnsRef;
+
+    public WriterPartitioner(int[] partitionColumns, IBinaryComparator[] 
partitionComparators) {
+        this.partitionColumns = partitionColumns;
+        this.partitionComparators = partitionComparators;
+        partitionColumnsRef = new 
PermutingFrameTupleReference(partitionColumns);
+        partitionColumnsPrevCopy =
+                PointableTupleReference.create(partitionColumns.length, 
ArrayBackedValueStorage::new);
+    }
+
+    @Override
+    public boolean isNewPartition(FrameTupleAccessor tupleAccessor, int index) 
throws HyracksDataException {
+        boolean newPartition = 
!PreclusteredGroupWriter.sameGroup(partitionColumnsPrevCopy, tupleAccessor, 
index,
+                partitionColumns, partitionComparators);
+
+        // Set previous
+        partitionColumnsRef.reset(tupleAccessor, index);
+        partitionColumnsPrevCopy.set(partitionColumnsRef);
+
+        return newPartition;
+    }
+}

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18175
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I81a86870befbc75720b2e2e585c9741463c707c6
Gerrit-Change-Number: 18175
Gerrit-PatchSet: 1
Gerrit-Owner: Wail Alkowaileet <[email protected]>
Gerrit-MessageType: newchange

Reply via email to