Till Westmann has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1404

Change subject: WIP - add result format metadata for result sets
......................................................................

WIP - add result format metadata for result sets

- provide an arbitrary Serializable as result metadata (instead of a fixed
  boolean 'ordered' property)
- fix hashCode/equals for DatasetDirectoryRecord

Change-Id: Ifc4eddd23f508962483f3470e88760bf8124dc25
---
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
M 
hyracks-fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/NetworkAddress.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetDirectoryRecord.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetStateRecord.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/ResultSetMetaData.java
M 
hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDataset.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/IDatasetDirectoryService.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
M 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
21 files changed, 71 insertions(+), 55 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/04/1404/1

diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 61c1aff..88eb712 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -20,6 +20,7 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
@@ -739,7 +740,7 @@
 
     @Override
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
getResultHandleRuntime(IDataSink sink,
-            int[] printColumns, IPrinterFactory[] printerFactories, 
RecordDescriptor inputDesc, boolean ordered,
+            int[] printColumns, IPrinterFactory[] printerFactories, 
RecordDescriptor inputDesc, Serializable metadata,
             JobSpecification spec) throws AlgebricksException {
         ResultSetDataSink rsds = (ResultSetDataSink) sink;
         ResultSetSinkId rssId = rsds.getId();
@@ -748,7 +749,7 @@
         try {
             IResultSerializerFactory resultSerializedAppenderFactory = 
resultSerializerFactoryProvider
                     .getAqlResultSerializerFactoryProvider(printColumns, 
printerFactories, getWriterFactory());
-            resultWriter = new ResultWriterOperatorDescriptor(spec, rsId, 
ordered, getResultAsyncMode(),
+            resultWriter = new ResultWriterOperatorDescriptor(spec, rsId, 
metadata, getResultAsyncMode(),
                     resultSerializedAppenderFactory);
         } catch (IOException e) {
             throw new AlgebricksException(e);
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
index 5d1f402..004d8f7 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hyracks.algebricks.core.algebra.metadata;
 
+import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
 
@@ -56,7 +57,7 @@
             throws AlgebricksException;
 
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
getResultHandleRuntime(IDataSink sink,
-            int[] printColumns, IPrinterFactory[] printerFactories, 
RecordDescriptor inputDesc, boolean ordered,
+            int[] printColumns, IPrinterFactory[] printerFactories, 
RecordDescriptor inputDesc, Serializable metadata,
             JobSpecification spec) throws AlgebricksException;
 
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
getWriteResultRuntime(IDataSource<S> dataSource,
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
index b3e8385..f1b584e 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
@@ -104,7 +104,7 @@
                 context, columns);
 
         Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
runtimeAndConstraints = mp.getResultHandleRuntime(
-                resultOp.getDataSink(), columns, pf, inputDesc, true, spec);
+                resultOp.getDataSink(), columns, pf, inputDesc, null, spec);
 
         builder.contributeHyracksOperator(resultOp, 
runtimeAndConstraints.first);
         ILogicalOperator src = resultOp.getInputs().get(0).getValue();
diff --git 
a/hyracks-fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
 
b/hyracks-fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
index 3f7f56d..70ee508 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hyracks.algebricks.examples.piglet.metadata;
 
+import java.io.Serializable;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -162,7 +163,7 @@
 
     @Override
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
getResultHandleRuntime(IDataSink sink,
-            int[] printColumns, IPrinterFactory[] printerFactories, 
RecordDescriptor inputDesc, boolean ordered,
+            int[] printColumns, IPrinterFactory[] printerFactories, 
RecordDescriptor inputDesc, Serializable metadata,
             JobSpecification spec) throws AlgebricksException {
         return null;
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/NetworkAddress.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/NetworkAddress.java
index 752a6b3..64f7417 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/NetworkAddress.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/NetworkAddress.java
@@ -85,7 +85,7 @@
             return false;
         }
         NetworkAddress on = (NetworkAddress) o;
-        return on.port == port && on.address == address;
+        return on.port == port && on.address.equals(address);
     }
 
     @Override
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetDirectoryRecord.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetDirectoryRecord.java
index a50e1ee..bb3a164 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetDirectoryRecord.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetDirectoryRecord.java
@@ -87,14 +87,16 @@
     }
 
     @Override
+    public int hashCode() {
+        return address.hashCode();
+    }
+
+    @Override
     public boolean equals(Object o) {
         if (o == this) {
             return true;
         }
-        if (!(o instanceof DatasetDirectoryRecord)) {
-            return false;
-        }
-        return address.equals(((DatasetDirectoryRecord) o).address);
+        return o instanceof DatasetDirectoryRecord && 
address.equals(((DatasetDirectoryRecord) o).address);
     }
 
     @Override
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java
index 34ed65c..4bb26c1 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java
@@ -58,6 +58,7 @@
         this.exceptions = exceptions;
     }
 
+    @Override
     public long getTimestamp() {
         return timestamp;
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
index fa22d8e..95da121 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
@@ -18,6 +18,8 @@
  */
 package org.apache.hyracks.api.dataset;
 
+import java.io.Serializable;
+
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksException;
@@ -25,11 +27,11 @@
 import org.apache.hyracks.api.job.JobId;
 
 public interface IDatasetPartitionManager extends IDatasetManager {
-    public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, 
ResultSetId rsId, boolean orderedResult,
+    public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, 
ResultSetId rsId, Serializable metadata,
             boolean asyncMode, int partition, int nPartitions) throws 
HyracksException;
 
     public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, 
int partition, int nPartitions,
-            boolean orderedResult, boolean emptyResult) throws 
HyracksException;
+            Serializable metadata, boolean emptyResult) throws 
HyracksException;
 
     public void reportPartitionWriteCompletion(JobId jobId, ResultSetId 
resultSetId, int partition)
             throws HyracksException;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetStateRecord.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetStateRecord.java
index d18e6cf..65f3ea6 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetStateRecord.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetStateRecord.java
@@ -19,5 +19,5 @@
 package org.apache.hyracks.api.dataset;
 
 public interface IDatasetStateRecord {
-    public long getTimestamp();
+    long getTimestamp();
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/ResultSetMetaData.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/ResultSetMetaData.java
index 2285981..e5cdcb3 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/ResultSetMetaData.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/ResultSetMetaData.java
@@ -18,18 +18,20 @@
  */
 package org.apache.hyracks.api.dataset;
 
+import java.io.Serializable;
+
 public class ResultSetMetaData {
-    private final boolean ordered;
+    private final Serializable metadata;
 
     private final DatasetDirectoryRecord[] records;
 
-    public ResultSetMetaData(boolean ordered, DatasetDirectoryRecord[] 
records) {
-        this.ordered = ordered;
+    public ResultSetMetaData(Serializable metadata, DatasetDirectoryRecord[] 
records) {
+        this.metadata = metadata;
         this.records = records;
     }
 
-    public boolean getOrderedResult() {
-        return ordered;
+    public Serializable metadata() {
+        return metadata;
     }
 
     public DatasetDirectoryRecord[] getRecords() {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDataset.java
 
b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDataset.java
index f60b3c3..6226fde 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDataset.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDataset.java
@@ -48,13 +48,11 @@
 
     @Override
     public IHyracksDatasetReader createReader(JobId jobId, ResultSetId 
resultSetId) throws HyracksDataException {
-        IHyracksDatasetReader reader = null;
         try {
-            reader = new 
HyracksDatasetReader(datasetDirectoryServiceConnection, netManager, 
datasetClientCtx, jobId,
+            return new HyracksDatasetReader(datasetDirectoryServiceConnection, 
netManager, datasetClientCtx, jobId,
                     resultSetId);
         } catch (Exception e) {
             throw new HyracksDataException(e);
         }
-        return reader;
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
index b6c9a08..95116b4 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
@@ -115,7 +115,7 @@
                 CCNCFunctions.RegisterResultPartitionLocationFunction rrplf =
                         
(CCNCFunctions.RegisterResultPartitionLocationFunction) fn;
                 ccs.getWorkQueue().schedule(new 
RegisterResultPartitionLocationWork(ccs,
-                        rrplf.getJobId(), rrplf.getResultSetId(), 
rrplf.getOrderedResult(), rrplf.getEmptyResult(),
+                        rrplf.getJobId(), rrplf.getResultSetId(), 
rrplf.getMetadata(), rrplf.getEmptyResult(),
                         rrplf.getPartition(), rrplf.getNPartitions(), 
rrplf.getNetworkAddress()));
                 break;
             case REPORT_RESULT_PARTITION_WRITE_COMPLETION:
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
index 4d7d1c3..52765a7 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hyracks.control.cc.dataset;
 
+import java.io.Serializable;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
@@ -91,13 +92,13 @@
     }
 
     @Override
-    public synchronized void registerResultPartitionLocation(JobId jobId, 
ResultSetId rsId, boolean orderedResult,
+    public synchronized void registerResultPartitionLocation(JobId jobId, 
ResultSetId rsId, Serializable metadata,
             boolean emptyResult, int partition, int nPartitions, 
NetworkAddress networkAddress) {
         DatasetJobRecord djr = getDatasetJobRecord(jobId);
 
         ResultSetMetaData resultSetMetaData = djr.get(rsId);
         if (resultSetMetaData == null) {
-            resultSetMetaData = new ResultSetMetaData(orderedResult, new 
DatasetDirectoryRecord[nPartitions]);
+            resultSetMetaData = new ResultSetMetaData(metadata, new 
DatasetDirectoryRecord[nPartitions]);
             djr.put(rsId, resultSetMetaData);
         }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/IDatasetDirectoryService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/IDatasetDirectoryService.java
index 9e4e03e..d39ae51 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/IDatasetDirectoryService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/IDatasetDirectoryService.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hyracks.control.cc.dataset;
 
+import java.io.Serializable;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 
@@ -34,7 +35,7 @@
 public interface IDatasetDirectoryService extends IJobLifecycleListener, 
IDatasetManager {
     public void init(ExecutorService executor);
 
-    public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, 
boolean orderedResult,
+    public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, 
Serializable metadata,
             boolean emptyResult, int partition, int nPartitions, 
NetworkAddress networkAddress);
 
     public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId 
rsId, int partition);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
index 4e4732d..95b5613 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
@@ -18,6 +18,8 @@
  */
 package org.apache.hyracks.control.cc.work;
 
+import java.io.Serializable;
+
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.job.JobId;
@@ -31,7 +33,7 @@
 
     private final ResultSetId rsId;
 
-    private final boolean orderedResult;
+    private final Serializable metadata;
 
     private final boolean emptyResult;
 
@@ -42,11 +44,11 @@
     private final NetworkAddress networkAddress;
 
     public RegisterResultPartitionLocationWork(ClusterControllerService ccs, 
JobId jobId, ResultSetId rsId,
-            boolean orderedResult, boolean emptyResult, int partition, int 
nPartitions, NetworkAddress networkAddress) {
+            Serializable metadata, boolean emptyResult, int partition, int 
nPartitions, NetworkAddress networkAddress) {
         this.ccs = ccs;
         this.jobId = jobId;
         this.rsId = rsId;
-        this.orderedResult = orderedResult;
+        this.metadata = metadata;
         this.emptyResult = emptyResult;
         this.partition = partition;
         this.nPartitions = nPartitions;
@@ -55,13 +57,14 @@
 
     @Override
     public void run() {
-        
ccs.getDatasetDirectoryService().registerResultPartitionLocation(jobId, rsId, 
orderedResult, emptyResult,
+        
ccs.getDatasetDirectoryService().registerResultPartitionLocation(jobId, rsId, 
metadata, emptyResult,
                 partition, nPartitions, networkAddress);
     }
 
     @Override
     public String toString() {
-        return getName() + ": JobId@" + jobId + " ResultSetId@" + rsId + " 
Partition@" + partition + " NPartitions@" + nPartitions
-                + " ResultPartitionLocation@" + networkAddress + " 
OrderedResult@" + orderedResult + " EmptyResult@" + emptyResult;
+        return getName() + ": JobId@" + jobId + " ResultSetId@" + rsId + " 
Partition@" + partition + " NPartitions@"
+                + nPartitions + " ResultPartitionLocation@" + networkAddress + 
" Metadata@" + metadata + " EmptyResult@"
+                + emptyResult;
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
index a0c0f95..5c3f6f2 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hyracks.control.common.base;
 
+import java.io.Serializable;
 import java.util.List;
 
 import org.apache.hyracks.api.comm.NetworkAddress;
@@ -62,7 +63,7 @@
 
     public void sendApplicationMessageToCC(byte[] data, DeploymentId 
deploymentId, String nodeId) throws Exception;
 
-    public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, 
boolean orderedResult,
+    public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, 
Serializable metadata,
             boolean emptyResult, int partition, int nPartitions, 
NetworkAddress networkAddress) throws Exception;
 
     public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId 
rsId, int partition) throws Exception;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index aa9a4fe..219fc0e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -521,7 +521,7 @@
 
         private final ResultSetId rsId;
 
-        private final boolean orderedResult;
+        private final Serializable metadata;
 
         private final boolean emptyResult;
 
@@ -531,11 +531,11 @@
 
         private NetworkAddress networkAddress;
 
-        public RegisterResultPartitionLocationFunction(JobId jobId, 
ResultSetId rsId, boolean orderedResult,
+        public RegisterResultPartitionLocationFunction(JobId jobId, 
ResultSetId rsId, Serializable metadata,
                 boolean emptyResult, int partition, int nPartitions, 
NetworkAddress networkAddress) {
             this.jobId = jobId;
             this.rsId = rsId;
-            this.orderedResult = orderedResult;
+            this.metadata = metadata;
             this.emptyResult = emptyResult;
             this.partition = partition;
             this.nPartitions = nPartitions;
@@ -555,8 +555,8 @@
             return rsId;
         }
 
-        public boolean getOrderedResult() {
-            return orderedResult;
+        public Serializable getMetadata() {
+            return metadata;
         }
 
         public boolean getEmptyResult() {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index ac6fc2c..1eb755f 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hyracks.control.common.ipc;
 
+import java.io.Serializable;
 import java.util.List;
 
 import org.apache.hyracks.api.comm.NetworkAddress;
@@ -117,11 +118,11 @@
     }
 
     @Override
-    public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, 
boolean orderedResult,
+    public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, 
Serializable metadata,
                                                 boolean emptyResult, int 
partition, int nPartitions,
                                                 NetworkAddress networkAddress) 
throws Exception {
         CCNCFunctions.RegisterResultPartitionLocationFunction fn =
-                new 
CCNCFunctions.RegisterResultPartitionLocationFunction(jobId, rsId, 
orderedResult, emptyResult,
+                new 
CCNCFunctions.RegisterResultPartitionLocationFunction(jobId, rsId, metadata, 
emptyResult,
                         partition, nPartitions, networkAddress);
         ipcHandle.send(-1, fn, null);
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
index a594f95..91f2cce 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hyracks.control.nc.dataset;
 
+import java.io.Serializable;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
@@ -70,12 +71,12 @@
     }
 
     @Override
-    public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, 
ResultSetId rsId, boolean orderedResult,
+    public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, 
ResultSetId rsId, Serializable metadata,
             boolean asyncMode, int partition, int nPartitions) throws 
HyracksException {
         DatasetPartitionWriter dpw = null;
         JobId jobId = ctx.getJobletContext().getJobId();
         synchronized (this) {
-            dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, 
asyncMode, orderedResult, partition, nPartitions,
+            dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, 
asyncMode, metadata, partition, nPartitions,
                     datasetMemoryManager, fileFactory);
 
             ResultSetMap rsIdMap = (ResultSetMap) 
partitionResultStateMap.get(jobId);
@@ -98,10 +99,10 @@
 
     @Override
     public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, 
int partition, int nPartitions,
-            boolean orderedResult, boolean emptyResult) throws 
HyracksException {
+            Serializable metadata, boolean emptyResult) throws 
HyracksException {
         try {
             // Be sure to send the *public* network address to the CC
-            ncs.getClusterController().registerResultPartitionLocation(jobId, 
rsId, orderedResult, emptyResult,
+            ncs.getClusterController().registerResultPartitionLocation(jobId, 
rsId, metadata, emptyResult,
                     partition, nPartitions, 
ncs.getDatasetNetworkManager().getPublicNetworkAddress());
         } catch (Exception e) {
             throw new HyracksException(e);
@@ -257,7 +258,7 @@
     private class ResultSetMap extends HashMap<ResultSetId, ResultState[]> 
implements IDatasetStateRecord {
         private static final long serialVersionUID = 1L;
 
-        long timestamp;
+        private long timestamp;
 
         public ResultSetMap() {
             super();
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
index e007050..10dc459 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hyracks.control.nc.dataset;
 
+import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -41,7 +42,7 @@
 
     private final ResultSetId resultSetId;
 
-    private final boolean orderedResult;
+    private final Serializable metadata;
 
     private final int partition;
 
@@ -49,24 +50,22 @@
 
     private final DatasetMemoryManager datasetMemoryManager;
 
-    private final ResultSetPartitionId resultSetPartitionId;
-
     private final ResultState resultState;
 
     private boolean partitionRegistered;
 
     public DatasetPartitionWriter(IHyracksTaskContext ctx, 
IDatasetPartitionManager manager, JobId jobId,
-            ResultSetId rsId, boolean asyncMode, boolean orderedResult, int 
partition, int nPartitions,
+            ResultSetId rsId, boolean asyncMode, Serializable metadata, int 
partition, int nPartitions,
             DatasetMemoryManager datasetMemoryManager, IWorkspaceFileFactory 
fileFactory) {
         this.manager = manager;
         this.jobId = jobId;
         this.resultSetId = rsId;
-        this.orderedResult = orderedResult;
+        this.metadata = metadata;
         this.partition = partition;
         this.nPartitions = nPartitions;
         this.datasetMemoryManager = datasetMemoryManager;
 
-        resultSetPartitionId = new ResultSetPartitionId(jobId, rsId, 
partition);
+        ResultSetPartitionId resultSetPartitionId = new 
ResultSetPartitionId(jobId, rsId, partition);
         resultState = new ResultState(resultSetPartitionId, asyncMode, 
ctx.getIOManager(), fileFactory,
                 ctx.getInitialFrameSize());
     }
@@ -127,7 +126,7 @@
 
     void registerResultPartitionLocation(boolean empty) throws 
HyracksDataException {
         try {
-            manager.registerResultPartitionLocation(jobId, resultSetId, 
partition, nPartitions, orderedResult, empty);
+            manager.registerResultPartitionLocation(jobId, resultSetId, 
partition, nPartitions, metadata, empty);
         } catch (HyracksException e) {
             if (e instanceof HyracksDataException) {
                 throw (HyracksDataException) e;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
index 90b4b6c..3390cd4 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
@@ -20,6 +20,7 @@
 
 import java.io.IOException;
 import java.io.PrintStream;
+import java.io.Serializable;
 import java.nio.ByteBuffer;
 
 import org.apache.hyracks.api.comm.IFrame;
@@ -46,17 +47,17 @@
 
     private final ResultSetId rsId;
 
-    private final boolean ordered;
+    private final Serializable metadata;
 
     private final boolean asyncMode;
 
     private final IResultSerializerFactory resultSerializerFactory;
 
-    public ResultWriterOperatorDescriptor(IOperatorDescriptorRegistry spec, 
ResultSetId rsId, boolean ordered,
+    public ResultWriterOperatorDescriptor(IOperatorDescriptorRegistry spec, 
ResultSetId rsId, Serializable metadata,
             boolean asyncMode, IResultSerializerFactory 
resultSerializerFactory) throws IOException {
         super(spec, 1, 0);
         this.rsId = rsId;
-        this.ordered = ordered;
+        this.metadata = metadata;
         this.asyncMode = asyncMode;
         this.resultSerializerFactory = resultSerializerFactory;
     }
@@ -85,7 +86,7 @@
             @Override
             public void open() throws HyracksDataException {
                 try {
-                    datasetPartitionWriter = 
dpm.createDatasetPartitionWriter(ctx, rsId, ordered, asyncMode, partition,
+                    datasetPartitionWriter = 
dpm.createDatasetPartitionWriter(ctx, rsId, metadata, asyncMode, partition,
                             nPartitions);
                     datasetPartitionWriter.open();
                     resultSerializer.init();

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1404
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ifc4eddd23f508962483f3470e88760bf8124dc25
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Till Westmann <[email protected]>

Reply via email to