Till Westmann has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1404
Change subject: WIP - add result format metadata for result sets
......................................................................
WIP - add result format metadata for result sets
- provide an arbitrary Serializable as result metadata (instead of a fixed
boolean 'ordered' property)
- fix hashCode/equals for DatasetDirectoryRecord
Change-Id: Ifc4eddd23f508962483f3470e88760bf8124dc25
---
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
M
hyracks-fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/NetworkAddress.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetDirectoryRecord.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetStateRecord.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/ResultSetMetaData.java
M
hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDataset.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/IDatasetDirectoryService.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
21 files changed, 71 insertions(+), 55 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/04/1404/1
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 61c1aff..88eb712 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -20,6 +20,7 @@
import java.io.File;
import java.io.IOException;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -739,7 +740,7 @@
@Override
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>
getResultHandleRuntime(IDataSink sink,
- int[] printColumns, IPrinterFactory[] printerFactories,
RecordDescriptor inputDesc, boolean ordered,
+ int[] printColumns, IPrinterFactory[] printerFactories,
RecordDescriptor inputDesc, Serializable metadata,
JobSpecification spec) throws AlgebricksException {
ResultSetDataSink rsds = (ResultSetDataSink) sink;
ResultSetSinkId rssId = rsds.getId();
@@ -748,7 +749,7 @@
try {
IResultSerializerFactory resultSerializedAppenderFactory =
resultSerializerFactoryProvider
.getAqlResultSerializerFactoryProvider(printColumns,
printerFactories, getWriterFactory());
- resultWriter = new ResultWriterOperatorDescriptor(spec, rsId,
ordered, getResultAsyncMode(),
+ resultWriter = new ResultWriterOperatorDescriptor(spec, rsId,
metadata, getResultAsyncMode(),
resultSerializedAppenderFactory);
} catch (IOException e) {
throw new AlgebricksException(e);
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
index 5d1f402..004d8f7 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
@@ -18,6 +18,7 @@
*/
package org.apache.hyracks.algebricks.core.algebra.metadata;
+import java.io.Serializable;
import java.util.List;
import java.util.Map;
@@ -56,7 +57,7 @@
throws AlgebricksException;
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>
getResultHandleRuntime(IDataSink sink,
- int[] printColumns, IPrinterFactory[] printerFactories,
RecordDescriptor inputDesc, boolean ordered,
+ int[] printColumns, IPrinterFactory[] printerFactories,
RecordDescriptor inputDesc, Serializable metadata,
JobSpecification spec) throws AlgebricksException;
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>
getWriteResultRuntime(IDataSource<S> dataSource,
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
index b3e8385..f1b584e 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
@@ -104,7 +104,7 @@
context, columns);
Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>
runtimeAndConstraints = mp.getResultHandleRuntime(
- resultOp.getDataSink(), columns, pf, inputDesc, true, spec);
+ resultOp.getDataSink(), columns, pf, inputDesc, null, spec);
builder.contributeHyracksOperator(resultOp,
runtimeAndConstraints.first);
ILogicalOperator src = resultOp.getInputs().get(0).getValue();
diff --git
a/hyracks-fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
b/hyracks-fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
index 3f7f56d..70ee508 100644
---
a/hyracks-fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
+++
b/hyracks-fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
@@ -18,6 +18,7 @@
*/
package org.apache.hyracks.algebricks.examples.piglet.metadata;
+import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -162,7 +163,7 @@
@Override
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>
getResultHandleRuntime(IDataSink sink,
- int[] printColumns, IPrinterFactory[] printerFactories,
RecordDescriptor inputDesc, boolean ordered,
+ int[] printColumns, IPrinterFactory[] printerFactories,
RecordDescriptor inputDesc, Serializable metadata,
JobSpecification spec) throws AlgebricksException {
return null;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/NetworkAddress.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/NetworkAddress.java
index 752a6b3..64f7417 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/NetworkAddress.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/NetworkAddress.java
@@ -85,7 +85,7 @@
return false;
}
NetworkAddress on = (NetworkAddress) o;
- return on.port == port && on.address == address;
+ return on.port == port && on.address.equals(address);
}
@Override
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetDirectoryRecord.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetDirectoryRecord.java
index a50e1ee..bb3a164 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetDirectoryRecord.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetDirectoryRecord.java
@@ -87,14 +87,16 @@
}
@Override
+ public int hashCode() {
+ return address.hashCode();
+ }
+
+ @Override
public boolean equals(Object o) {
if (o == this) {
return true;
}
- if (!(o instanceof DatasetDirectoryRecord)) {
- return false;
- }
- return address.equals(((DatasetDirectoryRecord) o).address);
+ return o instanceof DatasetDirectoryRecord &&
address.equals(((DatasetDirectoryRecord) o).address);
}
@Override
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java
index 34ed65c..4bb26c1 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java
@@ -58,6 +58,7 @@
this.exceptions = exceptions;
}
+ @Override
public long getTimestamp() {
return timestamp;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
index fa22d8e..95da121 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
@@ -18,6 +18,8 @@
*/
package org.apache.hyracks.api.dataset;
+import java.io.Serializable;
+
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksException;
@@ -25,11 +27,11 @@
import org.apache.hyracks.api.job.JobId;
public interface IDatasetPartitionManager extends IDatasetManager {
- public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx,
ResultSetId rsId, boolean orderedResult,
+ public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx,
ResultSetId rsId, Serializable metadata,
boolean asyncMode, int partition, int nPartitions) throws
HyracksException;
public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId,
int partition, int nPartitions,
- boolean orderedResult, boolean emptyResult) throws
HyracksException;
+ Serializable metadata, boolean emptyResult) throws
HyracksException;
public void reportPartitionWriteCompletion(JobId jobId, ResultSetId
resultSetId, int partition)
throws HyracksException;
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetStateRecord.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetStateRecord.java
index d18e6cf..65f3ea6 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetStateRecord.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetStateRecord.java
@@ -19,5 +19,5 @@
package org.apache.hyracks.api.dataset;
public interface IDatasetStateRecord {
- public long getTimestamp();
+ long getTimestamp();
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/ResultSetMetaData.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/ResultSetMetaData.java
index 2285981..e5cdcb3 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/ResultSetMetaData.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/ResultSetMetaData.java
@@ -18,18 +18,20 @@
*/
package org.apache.hyracks.api.dataset;
+import java.io.Serializable;
+
public class ResultSetMetaData {
- private final boolean ordered;
+ private final Serializable metadata;
private final DatasetDirectoryRecord[] records;
- public ResultSetMetaData(boolean ordered, DatasetDirectoryRecord[]
records) {
- this.ordered = ordered;
+ public ResultSetMetaData(Serializable metadata, DatasetDirectoryRecord[]
records) {
+ this.metadata = metadata;
this.records = records;
}
- public boolean getOrderedResult() {
- return ordered;
+ public Serializable metadata() {
+ return metadata;
}
public DatasetDirectoryRecord[] getRecords() {
diff --git
a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDataset.java
b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDataset.java
index f60b3c3..6226fde 100644
---
a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDataset.java
+++
b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDataset.java
@@ -48,13 +48,11 @@
@Override
public IHyracksDatasetReader createReader(JobId jobId, ResultSetId
resultSetId) throws HyracksDataException {
- IHyracksDatasetReader reader = null;
try {
- reader = new
HyracksDatasetReader(datasetDirectoryServiceConnection, netManager,
datasetClientCtx, jobId,
+ return new HyracksDatasetReader(datasetDirectoryServiceConnection,
netManager, datasetClientCtx, jobId,
resultSetId);
} catch (Exception e) {
throw new HyracksDataException(e);
}
- return reader;
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
index b6c9a08..95116b4 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
@@ -115,7 +115,7 @@
CCNCFunctions.RegisterResultPartitionLocationFunction rrplf =
(CCNCFunctions.RegisterResultPartitionLocationFunction) fn;
ccs.getWorkQueue().schedule(new
RegisterResultPartitionLocationWork(ccs,
- rrplf.getJobId(), rrplf.getResultSetId(),
rrplf.getOrderedResult(), rrplf.getEmptyResult(),
+ rrplf.getJobId(), rrplf.getResultSetId(),
rrplf.getMetadata(), rrplf.getEmptyResult(),
rrplf.getPartition(), rrplf.getNPartitions(),
rrplf.getNetworkAddress()));
break;
case REPORT_RESULT_PARTITION_WRITE_COMPLETION:
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
index 4d7d1c3..52765a7 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -18,6 +18,7 @@
*/
package org.apache.hyracks.control.cc.dataset;
+import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
@@ -91,13 +92,13 @@
}
@Override
- public synchronized void registerResultPartitionLocation(JobId jobId,
ResultSetId rsId, boolean orderedResult,
+ public synchronized void registerResultPartitionLocation(JobId jobId,
ResultSetId rsId, Serializable metadata,
boolean emptyResult, int partition, int nPartitions,
NetworkAddress networkAddress) {
DatasetJobRecord djr = getDatasetJobRecord(jobId);
ResultSetMetaData resultSetMetaData = djr.get(rsId);
if (resultSetMetaData == null) {
- resultSetMetaData = new ResultSetMetaData(orderedResult, new
DatasetDirectoryRecord[nPartitions]);
+ resultSetMetaData = new ResultSetMetaData(metadata, new
DatasetDirectoryRecord[nPartitions]);
djr.put(rsId, resultSetMetaData);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/IDatasetDirectoryService.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/IDatasetDirectoryService.java
index 9e4e03e..d39ae51 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/IDatasetDirectoryService.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/IDatasetDirectoryService.java
@@ -18,6 +18,7 @@
*/
package org.apache.hyracks.control.cc.dataset;
+import java.io.Serializable;
import java.util.List;
import java.util.concurrent.ExecutorService;
@@ -34,7 +35,7 @@
public interface IDatasetDirectoryService extends IJobLifecycleListener,
IDatasetManager {
public void init(ExecutorService executor);
- public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId,
boolean orderedResult,
+ public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId,
Serializable metadata,
boolean emptyResult, int partition, int nPartitions,
NetworkAddress networkAddress);
public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId
rsId, int partition);
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
index 4e4732d..95b5613 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
@@ -18,6 +18,8 @@
*/
package org.apache.hyracks.control.cc.work;
+import java.io.Serializable;
+
import org.apache.hyracks.api.comm.NetworkAddress;
import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.job.JobId;
@@ -31,7 +33,7 @@
private final ResultSetId rsId;
- private final boolean orderedResult;
+ private final Serializable metadata;
private final boolean emptyResult;
@@ -42,11 +44,11 @@
private final NetworkAddress networkAddress;
public RegisterResultPartitionLocationWork(ClusterControllerService ccs,
JobId jobId, ResultSetId rsId,
- boolean orderedResult, boolean emptyResult, int partition, int
nPartitions, NetworkAddress networkAddress) {
+ Serializable metadata, boolean emptyResult, int partition, int
nPartitions, NetworkAddress networkAddress) {
this.ccs = ccs;
this.jobId = jobId;
this.rsId = rsId;
- this.orderedResult = orderedResult;
+ this.metadata = metadata;
this.emptyResult = emptyResult;
this.partition = partition;
this.nPartitions = nPartitions;
@@ -55,13 +57,14 @@
@Override
public void run() {
-
ccs.getDatasetDirectoryService().registerResultPartitionLocation(jobId, rsId,
orderedResult, emptyResult,
+
ccs.getDatasetDirectoryService().registerResultPartitionLocation(jobId, rsId,
metadata, emptyResult,
partition, nPartitions, networkAddress);
}
@Override
public String toString() {
- return getName() + ": JobId@" + jobId + " ResultSetId@" + rsId + "
Partition@" + partition + " NPartitions@" + nPartitions
- + " ResultPartitionLocation@" + networkAddress + "
OrderedResult@" + orderedResult + " EmptyResult@" + emptyResult;
+ return getName() + ": JobId@" + jobId + " ResultSetId@" + rsId + "
Partition@" + partition + " NPartitions@"
+ + nPartitions + " ResultPartitionLocation@" + networkAddress +
" Metadata@" + metadata + " EmptyResult@"
+ + emptyResult;
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
index a0c0f95..5c3f6f2 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
@@ -18,6 +18,7 @@
*/
package org.apache.hyracks.control.common.base;
+import java.io.Serializable;
import java.util.List;
import org.apache.hyracks.api.comm.NetworkAddress;
@@ -62,7 +63,7 @@
public void sendApplicationMessageToCC(byte[] data, DeploymentId
deploymentId, String nodeId) throws Exception;
- public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId,
boolean orderedResult,
+ public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId,
Serializable metadata,
boolean emptyResult, int partition, int nPartitions,
NetworkAddress networkAddress) throws Exception;
public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId
rsId, int partition) throws Exception;
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index aa9a4fe..219fc0e 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -521,7 +521,7 @@
private final ResultSetId rsId;
- private final boolean orderedResult;
+ private final Serializable metadata;
private final boolean emptyResult;
@@ -531,11 +531,11 @@
private NetworkAddress networkAddress;
- public RegisterResultPartitionLocationFunction(JobId jobId,
ResultSetId rsId, boolean orderedResult,
+ public RegisterResultPartitionLocationFunction(JobId jobId,
ResultSetId rsId, Serializable metadata,
boolean emptyResult, int partition, int nPartitions,
NetworkAddress networkAddress) {
this.jobId = jobId;
this.rsId = rsId;
- this.orderedResult = orderedResult;
+ this.metadata = metadata;
this.emptyResult = emptyResult;
this.partition = partition;
this.nPartitions = nPartitions;
@@ -555,8 +555,8 @@
return rsId;
}
- public boolean getOrderedResult() {
- return orderedResult;
+ public Serializable getMetadata() {
+ return metadata;
}
public boolean getEmptyResult() {
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index ac6fc2c..1eb755f 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -18,6 +18,7 @@
*/
package org.apache.hyracks.control.common.ipc;
+import java.io.Serializable;
import java.util.List;
import org.apache.hyracks.api.comm.NetworkAddress;
@@ -117,11 +118,11 @@
}
@Override
- public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId,
boolean orderedResult,
+ public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId,
Serializable metadata,
boolean emptyResult, int
partition, int nPartitions,
NetworkAddress networkAddress)
throws Exception {
CCNCFunctions.RegisterResultPartitionLocationFunction fn =
- new
CCNCFunctions.RegisterResultPartitionLocationFunction(jobId, rsId,
orderedResult, emptyResult,
+ new
CCNCFunctions.RegisterResultPartitionLocationFunction(jobId, rsId, metadata,
emptyResult,
partition, nPartitions, networkAddress);
ipcHandle.send(-1, fn, null);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
index a594f95..91f2cce 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -18,6 +18,7 @@
*/
package org.apache.hyracks.control.nc.dataset;
+import java.io.Serializable;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
@@ -70,12 +71,12 @@
}
@Override
- public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx,
ResultSetId rsId, boolean orderedResult,
+ public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx,
ResultSetId rsId, Serializable metadata,
boolean asyncMode, int partition, int nPartitions) throws
HyracksException {
DatasetPartitionWriter dpw = null;
JobId jobId = ctx.getJobletContext().getJobId();
synchronized (this) {
- dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId,
asyncMode, orderedResult, partition, nPartitions,
+ dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId,
asyncMode, metadata, partition, nPartitions,
datasetMemoryManager, fileFactory);
ResultSetMap rsIdMap = (ResultSetMap)
partitionResultStateMap.get(jobId);
@@ -98,10 +99,10 @@
@Override
public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId,
int partition, int nPartitions,
- boolean orderedResult, boolean emptyResult) throws
HyracksException {
+ Serializable metadata, boolean emptyResult) throws
HyracksException {
try {
// Be sure to send the *public* network address to the CC
- ncs.getClusterController().registerResultPartitionLocation(jobId,
rsId, orderedResult, emptyResult,
+ ncs.getClusterController().registerResultPartitionLocation(jobId,
rsId, metadata, emptyResult,
partition, nPartitions,
ncs.getDatasetNetworkManager().getPublicNetworkAddress());
} catch (Exception e) {
throw new HyracksException(e);
@@ -257,7 +258,7 @@
private class ResultSetMap extends HashMap<ResultSetId, ResultState[]>
implements IDatasetStateRecord {
private static final long serialVersionUID = 1L;
- long timestamp;
+ private long timestamp;
public ResultSetMap() {
super();
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
index e007050..10dc459 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
@@ -18,6 +18,7 @@
*/
package org.apache.hyracks.control.nc.dataset;
+import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -41,7 +42,7 @@
private final ResultSetId resultSetId;
- private final boolean orderedResult;
+ private final Serializable metadata;
private final int partition;
@@ -49,24 +50,22 @@
private final DatasetMemoryManager datasetMemoryManager;
- private final ResultSetPartitionId resultSetPartitionId;
-
private final ResultState resultState;
private boolean partitionRegistered;
public DatasetPartitionWriter(IHyracksTaskContext ctx,
IDatasetPartitionManager manager, JobId jobId,
- ResultSetId rsId, boolean asyncMode, boolean orderedResult, int
partition, int nPartitions,
+ ResultSetId rsId, boolean asyncMode, Serializable metadata, int
partition, int nPartitions,
DatasetMemoryManager datasetMemoryManager, IWorkspaceFileFactory
fileFactory) {
this.manager = manager;
this.jobId = jobId;
this.resultSetId = rsId;
- this.orderedResult = orderedResult;
+ this.metadata = metadata;
this.partition = partition;
this.nPartitions = nPartitions;
this.datasetMemoryManager = datasetMemoryManager;
- resultSetPartitionId = new ResultSetPartitionId(jobId, rsId,
partition);
+ ResultSetPartitionId resultSetPartitionId = new
ResultSetPartitionId(jobId, rsId, partition);
resultState = new ResultState(resultSetPartitionId, asyncMode,
ctx.getIOManager(), fileFactory,
ctx.getInitialFrameSize());
}
@@ -127,7 +126,7 @@
void registerResultPartitionLocation(boolean empty) throws
HyracksDataException {
try {
- manager.registerResultPartitionLocation(jobId, resultSetId,
partition, nPartitions, orderedResult, empty);
+ manager.registerResultPartitionLocation(jobId, resultSetId,
partition, nPartitions, metadata, empty);
} catch (HyracksException e) {
if (e instanceof HyracksDataException) {
throw (HyracksDataException) e;
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
index 90b4b6c..3390cd4 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.io.PrintStream;
+import java.io.Serializable;
import java.nio.ByteBuffer;
import org.apache.hyracks.api.comm.IFrame;
@@ -46,17 +47,17 @@
private final ResultSetId rsId;
- private final boolean ordered;
+ private final Serializable metadata;
private final boolean asyncMode;
private final IResultSerializerFactory resultSerializerFactory;
- public ResultWriterOperatorDescriptor(IOperatorDescriptorRegistry spec,
ResultSetId rsId, boolean ordered,
+ public ResultWriterOperatorDescriptor(IOperatorDescriptorRegistry spec,
ResultSetId rsId, Serializable metadata,
boolean asyncMode, IResultSerializerFactory
resultSerializerFactory) throws IOException {
super(spec, 1, 0);
this.rsId = rsId;
- this.ordered = ordered;
+ this.metadata = metadata;
this.asyncMode = asyncMode;
this.resultSerializerFactory = resultSerializerFactory;
}
@@ -85,7 +86,7 @@
@Override
public void open() throws HyracksDataException {
try {
- datasetPartitionWriter =
dpm.createDatasetPartitionWriter(ctx, rsId, ordered, asyncMode, partition,
+ datasetPartitionWriter =
dpm.createDatasetPartitionWriter(ctx, rsId, metadata, asyncMode, partition,
nPartitions);
datasetPartitionWriter.open();
resultSerializer.init();
--
To view, visit https://asterix-gerrit.ics.uci.edu/1404
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ifc4eddd23f508962483f3470e88760bf8124dc25
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Till Westmann <[email protected]>