>From Wail Alkowaileet <[email protected]>:
Wail Alkowaileet has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18175 )
Change subject: [WIP] Prep external database writer
......................................................................
[WIP] Prep external database writer
Change-Id: I81a86870befbc75720b2e2e585c9741463c707c6
---
R
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriter.java
A
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/IWriterPartitioner.java
A
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/KeyWriterPartitioner.java
R
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriterFactory.java
A
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/NoOpWriterPartitioner.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/DynamicPathResolver.java
M
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntime.java
A
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalDatabaseWriter.java
A
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/WriterPartitioner.java
M
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntimeFactory.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
11 files changed, 243 insertions(+), 33 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/75/18175/1
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 8d8ca80..cea0689 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -106,7 +106,7 @@
import
org.apache.asterix.runtime.operators.LSMSecondaryInsertDeleteWithNestedPlanOperatorDescriptor;
import
org.apache.asterix.runtime.operators.LSMSecondaryUpsertOperatorDescriptor;
import
org.apache.asterix.runtime.operators.LSMSecondaryUpsertWithNestedPlanOperatorDescriptor;
-import org.apache.asterix.runtime.writer.ExternalWriterFactory;
+import org.apache.asterix.runtime.writer.ExternalFileWriterFactory;
import org.apache.asterix.runtime.writer.IExternalFilePrinterFactory;
import org.apache.asterix.runtime.writer.IExternalFileWriterFactory;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
@@ -762,7 +762,7 @@
String fileExtension = ExternalWriterProvider.getFileExtension(sink);
int maxResult = ExternalWriterProvider.getMaxResult(sink);
IExternalFilePrinterFactory printerFactory =
ExternalWriterProvider.createPrinter(sink, sourceType);
- ExternalWriterFactory writerFactory = new
ExternalWriterFactory(fileWriterFactory, printerFactory,
+ ExternalFileWriterFactory writerFactory = new
ExternalFileWriterFactory(fileWriterFactory, printerFactory,
fileExtension, maxResult, dynamicPathEvalFactory, staticPath,
pathSourceLocation);
SinkExternalWriterRuntimeFactory runtime = new
SinkExternalWriterRuntimeFactory(sourceColumn, partitionColumns,
partitionComparatorFactories, inputDesc, writerFactory);
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/DynamicPathResolver.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/DynamicPathResolver.java
index 7105efa..c6d1dbe 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/DynamicPathResolver.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/DynamicPathResolver.java
@@ -54,7 +54,7 @@
@Override
public String getPartitionDirectory(IFrameTupleReference tuple) throws
HyracksDataException {
if (!appendPrefix(tuple)) {
- return ExternalWriter.UNRESOLVABLE_PATH;
+ return ExternalFileWriter.UNRESOLVABLE_PATH;
}
if (dirStringBuilder.length() > 0 &&
dirStringBuilder.charAt(dirStringBuilder.length() - 1) != fileSeparator) {
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalDatabaseWriter.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalDatabaseWriter.java
new file mode 100644
index 0000000..676d6af
--- /dev/null
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalDatabaseWriter.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.writer;
+
+import org.apache.hyracks.algebricks.runtime.writers.IExternalWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+public class ExternalDatabaseWriter implements IExternalWriter {
+ @Override
+ public void open() throws HyracksDataException {
+
+ }
+
+ @Override
+ public void initNewPartition(IFrameTupleReference tuple) throws
HyracksDataException {
+ // Set the key here
+ }
+
+ @Override
+ public void write(IValueReference value) throws HyracksDataException {
+
+ }
+
+ @Override
+ public void abort() throws HyracksDataException {
+
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+
+ }
+}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriter.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriter.java
similarity index 94%
rename from
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriter.java
rename to
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriter.java
index 5fc07af..f9f98da 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriter.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriter.java
@@ -23,7 +23,7 @@
import org.apache.hyracks.data.std.api.IValueReference;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-final class ExternalWriter implements IExternalWriter {
+final class ExternalFileWriter implements IExternalWriter {
static final String UNRESOLVABLE_PATH = "UNRESOLVABLE_PATH";
private final IPathResolver pathResolver;
private final IExternalFileWriter writer;
@@ -31,7 +31,7 @@
private String partitionPath;
private int tupleCounter;
- public ExternalWriter(IPathResolver pathResolver, IExternalFileWriter
writer, int maxResultPerFile) {
+ public ExternalFileWriter(IPathResolver pathResolver, IExternalFileWriter
writer, int maxResultPerFile) {
this.pathResolver = pathResolver;
this.writer = writer;
this.maxResultPerFile = maxResultPerFile;
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriterFactory.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriterFactory.java
similarity index 88%
rename from
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriterFactory.java
rename to
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriterFactory.java
index e7c0db0..d5f01e8 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalWriterFactory.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/writer/ExternalFileWriterFactory.java
@@ -28,7 +28,7 @@
import org.apache.hyracks.api.exceptions.IWarningCollector;
import org.apache.hyracks.api.exceptions.SourceLocation;
-public class ExternalWriterFactory implements IExternalWriterFactory {
+public class ExternalFileWriterFactory implements IExternalWriterFactory {
private static final long serialVersionUID = 1412969574113419638L;
private final IExternalFileWriterFactory writerFactory;
private final IExternalFilePrinterFactory printerFactory;
@@ -38,9 +38,9 @@
private final String staticPath;
private final SourceLocation pathSourceLocation;
- public ExternalWriterFactory(IExternalFileWriterFactory writerFactory,
IExternalFilePrinterFactory printerFactory,
- String fileExtension, int maxResult, IScalarEvaluatorFactory
pathEvalFactory, String staticPath,
- SourceLocation pathSourceLocation) {
+ public ExternalFileWriterFactory(IExternalFileWriterFactory writerFactory,
+ IExternalFilePrinterFactory printerFactory, String fileExtension,
int maxResult,
+ IScalarEvaluatorFactory pathEvalFactory, String staticPath,
SourceLocation pathSourceLocation) {
this.writerFactory = writerFactory;
this.printerFactory = printerFactory;
this.fileExtension = fileExtension;
@@ -65,6 +65,6 @@
resolver = new StaticPathResolver(fileExtension, fileSeparator,
partition, staticPath);
}
IExternalFileWriter writer = writerFactory.createWriter(context,
printerFactory);
- return new ExternalWriter(resolver, writer, maxResult);
+ return new ExternalFileWriter(resolver, writer, maxResult);
}
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/IWriterPartitioner.java
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/IWriterPartitioner.java
new file mode 100644
index 0000000..39b3647
--- /dev/null
+++
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/IWriterPartitioner.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.runtime.operators.writer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+interface IWriterPartitioner {
+ boolean isNewPartition(FrameTupleAccessor tupleAccessor, int index) throws
HyracksDataException;
+
+}
diff --git
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/KeyWriterPartitioner.java
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/KeyWriterPartitioner.java
new file mode 100644
index 0000000..3cb44ff
--- /dev/null
+++
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/KeyWriterPartitioner.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.runtime.operators.writer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+class KeyWriterPartitioner implements IWriterPartitioner {
+ public static final IWriterPartitioner INSTANCE = new
KeyWriterPartitioner();
+
+ private KeyWriterPartitioner() {
+ }
+
+ @Override
+ public boolean isNewPartition(FrameTupleAccessor tupleAccessor, int index)
throws HyracksDataException {
+ // Every key is a partition
+ return true;
+ }
+}
diff --git
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/NoOpWriterPartitioner.java
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/NoOpWriterPartitioner.java
new file mode 100644
index 0000000..967e23c
--- /dev/null
+++
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/NoOpWriterPartitioner.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.runtime.operators.writer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+class NoOpWriterPartitioner implements IWriterPartitioner {
+ public static final IWriterPartitioner INSTANCE = new
NoOpWriterPartitioner();
+
+ private NoOpWriterPartitioner() {
+ }
+
+ @Override
+ public boolean isNewPartition(FrameTupleAccessor tupleAccessor, int index)
throws HyracksDataException {
+ return false;
+ }
+}
diff --git
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntime.java
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntime.java
index 01e137b..97f626b 100644
---
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntime.java
+++
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntime.java
@@ -23,41 +23,29 @@
import
org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputSinkPushRuntime;
import org.apache.hyracks.algebricks.runtime.writers.IExternalWriter;
import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.api.IPointable;
import org.apache.hyracks.data.std.primitive.VoidPointable;
-import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-import
org.apache.hyracks.dataflow.common.data.accessors.PermutingFrameTupleReference;
-import
org.apache.hyracks.dataflow.common.data.accessors.PointableTupleReference;
-import
org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupWriter;
final class SinkExternalWriterRuntime extends AbstractOneInputSinkPushRuntime {
private final int sourceColumn;
- private final int[] partitionColumns;
+ private final IWriterPartitioner partitioner;
private final IPointable sourceValue;
- private final PointableTupleReference partitionColumnsPrevCopy;
- private final PermutingFrameTupleReference partitionColumnsRef;
- private final IBinaryComparator[] partitionComparators;
private final IExternalWriter writer;
private FrameTupleAccessor tupleAccessor;
private FrameTupleReference tupleRef;
private boolean first;
private IFrameWriter frameWriter;
- SinkExternalWriterRuntime(int sourceColumn, int[] partitionColumns,
IBinaryComparator[] partitionComparators,
- RecordDescriptor inputRecordDesc, IExternalWriter writer) {
+ SinkExternalWriterRuntime(int sourceColumn, IWriterPartitioner
partitioner, RecordDescriptor inputRecordDesc,
+ IExternalWriter writer) {
this.sourceColumn = sourceColumn;
- this.partitionColumns = partitionColumns;
+ this.partitioner = partitioner;
this.sourceValue = new VoidPointable();
- partitionColumnsRef = new
PermutingFrameTupleReference(partitionColumns);
- partitionColumnsPrevCopy =
- PointableTupleReference.create(partitionColumns.length,
ArrayBackedValueStorage::new);
- this.partitionComparators = partitionComparators;
this.inputRecordDesc = inputRecordDesc;
this.writer = writer;
first = true;
@@ -83,8 +71,6 @@
}
setValue(tupleRef, sourceColumn, sourceValue);
writer.write(sourceValue);
- partitionColumnsRef.reset(tupleAccessor, i);
- partitionColumnsPrevCopy.set(partitionColumnsRef);
}
}
@@ -111,8 +97,7 @@
return true;
}
- return !PreclusteredGroupWriter.sameGroup(partitionColumnsPrevCopy,
tupleAccessor, index, partitionColumns,
- partitionComparators);
+ return partitioner.isNewPartition(tupleAccessor, index);
}
private void setValue(IFrameTupleReference tuple, int column, IPointable
value) {
diff --git
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntimeFactory.java
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntimeFactory.java
index 6220dec..023f671 100644
---
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntimeFactory.java
+++
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/SinkExternalWriterRuntimeFactory.java
@@ -49,12 +49,27 @@
@Override
public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws
HyracksDataException {
IExternalWriter writer = writerFactory.createWriter(ctx);
+ IWriterPartitioner partitioner = createPartitioner();
+ SinkExternalWriterRuntime runtime =
+ new SinkExternalWriterRuntime(sourceColumn, partitioner,
inputRecordDescriptor, writer);
+ return new IPushRuntime[] { runtime };
+ }
+
+ private IWriterPartitioner createPartitioner() {
+ if (partitionColumn.length == 0) {
+ return NoOpWriterPartitioner.INSTANCE;
+ }
+ /*
+ * else if(keyColumns.length > 0) {
+ * return new KeyWriterPartitioner()
+ * }
+ */
+
IBinaryComparator[] partitionComparators = new
IBinaryComparator[partitionComparatorFactories.length];
for (int i = 0; i < partitionComparatorFactories.length; i++) {
partitionComparators[i] =
partitionComparatorFactories[i].createBinaryComparator();
}
- SinkExternalWriterRuntime runtime = new
SinkExternalWriterRuntime(sourceColumn, partitionColumn,
- partitionComparators, inputRecordDescriptor, writer);
- return new IPushRuntime[] { runtime };
+
+ return new WriterPartitioner(partitionColumn, partitionComparators);
}
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/WriterPartitioner.java
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/WriterPartitioner.java
new file mode 100644
index 0000000..991581c
--- /dev/null
+++
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/writer/WriterPartitioner.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.runtime.operators.writer;
+
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import
org.apache.hyracks.dataflow.common.data.accessors.PermutingFrameTupleReference;
+import
org.apache.hyracks.dataflow.common.data.accessors.PointableTupleReference;
+import
org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupWriter;
+
+class WriterPartitioner implements IWriterPartitioner {
+ private final int[] partitionColumns;
+ private final IBinaryComparator[] partitionComparators;
+ private final PointableTupleReference partitionColumnsPrevCopy;
+ private final PermutingFrameTupleReference partitionColumnsRef;
+
+ public WriterPartitioner(int[] partitionColumns, IBinaryComparator[]
partitionComparators) {
+ this.partitionColumns = partitionColumns;
+ this.partitionComparators = partitionComparators;
+ partitionColumnsRef = new
PermutingFrameTupleReference(partitionColumns);
+ partitionColumnsPrevCopy =
+ PointableTupleReference.create(partitionColumns.length,
ArrayBackedValueStorage::new);
+ }
+
+ @Override
+ public boolean isNewPartition(FrameTupleAccessor tupleAccessor, int index)
throws HyracksDataException {
+ boolean newPartition =
!PreclusteredGroupWriter.sameGroup(partitionColumnsPrevCopy, tupleAccessor,
index,
+ partitionColumns, partitionComparators);
+
+ // Set previous
+ partitionColumnsRef.reset(tupleAccessor, index);
+ partitionColumnsPrevCopy.set(partitionColumnsRef);
+
+ return newPartition;
+ }
+}
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18175
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I81a86870befbc75720b2e2e585c9741463c707c6
Gerrit-Change-Number: 18175
Gerrit-PatchSet: 1
Gerrit-Owner: Wail Alkowaileet <[email protected]>
Gerrit-MessageType: newchange