Till Westmann has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1548
Change subject: WIP - re-read results
......................................................................
WIP - re-read results
Change-Id: I88fe289fe9109ea012c63d82af0083dce6bde31b
---
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
M
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java
A
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.1.async.sqlpp
A
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.2.pollget.http
A
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.3.get.http
A
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.4.get.http
A
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.5.query.sqlpp
A
asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.1.ignore
A
asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.2.json
A
asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.3.json
A
asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.4.json
A
asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.5.json
M asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
18 files changed, 234 insertions(+), 28 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/48/1548/1
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
index 292dd2a..d8c39f4 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
@@ -27,6 +27,7 @@
import org.apache.asterix.app.result.ResultUtil;
import org.apache.asterix.translator.IStatementExecutor.Stats;
import org.apache.asterix.translator.SessionConfig;
+import org.apache.hyracks.api.dataset.DatasetJobRecord;
import org.apache.hyracks.api.dataset.IHyracksDataset;
import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.exceptions.ErrorCode;
@@ -67,6 +68,19 @@
IHyracksDataset hds = getHyracksDataset();
ResultReader resultReader = new ResultReader(hds, jobId, rsId);
+ DatasetJobRecord.Status status = resultReader.getStatus();
+ switch (status) {
+ case SUCCESS:
+ break;
+ case RUNNING:
+ case IDLE:
+ case FAILED:
+ response.setStatus(HttpResponseStatus.NOT_FOUND);
+ return;
+ default:
+
response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ return;
+ }
// QQQ The output format is determined by the initial
// query and cannot be modified here, so calling back to
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java
index 3531211..3e40c40 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java
@@ -49,7 +49,7 @@
PrettyPrinter singleLine = new SingleLinePrettyPrinter();
ObjectNode result = om.readValue(resultStr, ObjectNode.class);
- LOGGER.fine("+++++++\n" + result + "\n+++++++\n");
+ System.err.println("+++++++\n" + result + "\n+++++++\n");
String type = "";
String status = "";
@@ -106,7 +106,7 @@
}
break;
default:
- throw new AsterixException(field + "unanticipated field");
+ throw new AsterixException("Unanticipated field \"" +
field + "\"");
}
}
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.1.async.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.1.async.sqlpp
new file mode 100644
index 0000000..a44b911
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.1.async.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#handlevariable=handle
+
+select i, i * i as i2 from range(1, 10) i;
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.2.pollget.http
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.2.pollget.http
new file mode 100644
index 0000000..88e0861
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.2.pollget.http
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#polltimeoutsecs=100
+
+/query/status?handle=$handle
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.3.get.http
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.3.get.http
new file mode 100644
index 0000000..a88991c
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.3.get.http
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/query/result?handle=$handle
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.4.get.http
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.4.get.http
new file mode 100644
index 0000000..a88991c
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.4.get.http
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/query/result?handle=$handle
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.5.query.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.5.query.sqlpp
new file mode 100644
index 0000000..e452678
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/async-repeated/async-repeated.5.query.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+select i, i * i as i2 from range(1, 10) i;
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.1.ignore
b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.1.ignore
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.1.ignore
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.2.json
b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.2.json
new file mode 100644
index 0000000..6213a6b
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.2.json
@@ -0,0 +1 @@
+{"status":"SUCCESS"}
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.3.json
b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.3.json
new file mode 100644
index 0000000..09e86cc
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.3.json
@@ -0,0 +1,10 @@
+{ "i": 1, "i2": 1 }
+{ "i": 2, "i2": 4 }
+{ "i": 3, "i2": 9 }
+{ "i": 4, "i2": 16 }
+{ "i": 5, "i2": 25 }
+{ "i": 6, "i2": 36 }
+{ "i": 7, "i2": 49 }
+{ "i": 8, "i2": 64 }
+{ "i": 9, "i2": 81 }
+{ "i": 10, "i2": 100 }
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.4.json
b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.4.json
new file mode 100644
index 0000000..09e86cc
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.4.json
@@ -0,0 +1,10 @@
+{ "i": 1, "i2": 1 }
+{ "i": 2, "i2": 4 }
+{ "i": 3, "i2": 9 }
+{ "i": 4, "i2": 16 }
+{ "i": 5, "i2": 25 }
+{ "i": 6, "i2": 36 }
+{ "i": 7, "i2": 49 }
+{ "i": 8, "i2": 64 }
+{ "i": 9, "i2": 81 }
+{ "i": 10, "i2": 100 }
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.5.json
b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.5.json
new file mode 100644
index 0000000..09e86cc
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred/async-repeated/async-repeated.5.json
@@ -0,0 +1,10 @@
+{ "i": 1, "i2": 1 }
+{ "i": 2, "i2": 4 }
+{ "i": 3, "i2": 9 }
+{ "i": 4, "i2": 16 }
+{ "i": 5, "i2": 25 }
+{ "i": 6, "i2": 36 }
+{ "i": 7, "i2": 49 }
+{ "i": 8, "i2": 64 }
+{ "i": 9, "i2": 81 }
+{ "i": 10, "i2": 100 }
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 431b215..3005a22 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -39,6 +39,11 @@
</compilation-unit>
</test-case>
<test-case FilePath="async-deferred">
+ <compilation-unit name="async-repeated">
+ <output-dir compare="Text">async-repeated</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="async-deferred">
<compilation-unit name="async-running">
<output-dir compare="Text">async-running</output-dir>
</compilation-unit>
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java
index f29ff4a..9bbe1c8 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java
@@ -18,6 +18,8 @@
*/
package org.apache.hyracks.api.dataset;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -84,7 +86,7 @@
@Override
public String toString() {
- return resultSetMetadataMap.toString();
+ return Arrays.toString(resultSetMetadataMap.entrySet().toArray());
}
public List<Exception> getExceptions() {
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
index 98c0697..0adb330 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -83,7 +83,7 @@
}
@Override
- public void notifyJobStart(JobId jobId) throws HyracksException {
+ public synchronized void notifyJobStart(JobId jobId) throws
HyracksException {
jobResultLocations.get(jobId).getRecord().start();
}
@@ -169,19 +169,18 @@
}
@Override
- public Set<JobId> getJobIds() {
+ public synchronized Set<JobId> getJobIds() {
return jobResultLocations.keySet();
}
@Override
- public IDatasetStateRecord getState(JobId jobId) {
+ public synchronized IDatasetStateRecord getState(JobId jobId) {
return getDatasetJobRecord(jobId);
}
@Override
- public void deinitState(JobId jobId) {
- // See ASTERIXDB-1614 - DatasetDirectoryService.deinitState() fix
intermittently fails
- // jobResultLocations.remove(jobId);
+ public synchronized void deinitState(JobId jobId) {
+ jobResultLocations.remove(jobId);
}
@Override
@@ -277,6 +276,11 @@
}
}
}
+
+ @Override
+ public String toString() {
+ return record.toString();
+ }
}
class Waiters extends HashMap<ResultSetId, Waiter> {
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
index 5fac823..a82f850 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -124,31 +124,30 @@
@Override
public void initializeDatasetPartitionReader(JobId jobId, ResultSetId
resultSetId, int partition,
IFrameWriter writer) throws HyracksException {
- ResultState resultState;
- synchronized (this) {
- ResultSetMap rsIdMap = (ResultSetMap)
partitionResultStateMap.get(jobId);
-
- if (rsIdMap == null) {
- throw new HyracksException("Unknown JobId " + jobId);
- }
-
- ResultState[] resultStates = rsIdMap.getResultStates(resultSetId);
- if (resultStates == null) {
- throw new HyracksException("Unknown JobId: " + jobId + "
ResultSetId: " + resultSetId);
- }
-
- resultState = resultStates[partition];
- if (resultState == null) {
- throw new HyracksException("No DatasetPartitionWriter for
partition " + partition);
- }
- }
-
+ ResultState resultState = getResultState(jobId, resultSetId,
partition);
DatasetPartitionReader dpr = new DatasetPartitionReader(this,
datasetMemoryManager, executor, resultState);
dpr.writeTo(writer);
LOGGER.fine("Initialized partition reader: JobId: " + jobId +
":ResultSetId: " + resultSetId + ":partition: "
+ partition);
}
+ protected synchronized ResultState getResultState(JobId jobId, ResultSetId
resultSetId, int partition)
+ throws HyracksException {
+ ResultSetMap rsIdMap = (ResultSetMap)
partitionResultStateMap.get(jobId);
+ if (rsIdMap == null) {
+ throw new HyracksException("Unknown JobId " + jobId);
+ }
+ ResultState[] resultStates = rsIdMap.getResultStates(resultSetId);
+ if (resultStates == null) {
+ throw new HyracksException("Unknown JobId: " + jobId + "
ResultSetId: " + resultSetId);
+ }
+ ResultState resultState = resultStates[partition];
+ if (resultState == null) {
+ throw new HyracksException("No DatasetPartitionWriter for
partition " + partition);
+ }
+ return resultState;
+ }
+
@Override
public synchronized void removePartition(JobId jobId, ResultSetId
resultSetId, int partition) {
ResultSetMap rsIdMap = (ResultSetMap)
partitionResultStateMap.get(jobId);
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
index c501b5b..bc31bd4 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/ResultState.java
@@ -25,6 +25,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Logger;
import org.apache.hyracks.api.dataflow.state.IStateObject;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -37,6 +38,8 @@
public class ResultState implements IStateObject {
private static final String FILE_PREFIX = "result_";
+
+ private static final Logger LOGGER =
Logger.getLogger(ResultState.class.getName());
private final ResultSetPartitionId resultSetPartitionId;
@@ -77,20 +80,24 @@
fileRef = null;
writeFileHandle = null;
+ System.err.println("XXX created " + toString());
}
public synchronized void open() {
+ System.err.println("XXX open " + toString());
size = 0;
persistentSize = 0;
}
public synchronized void close() {
+ System.err.println("XXX close " + toString());
eos.set(true);
closeWriteFileHandle();
notifyAll();
}
public synchronized void closeAndDelete() {
+ System.err.println("XXX closeAndDelete " + toString());
// Deleting a job is equivalent to aborting the job for all practical
purposes, so the same action, needs
// to be taken when there are more requests to these result states.
failed.set(true);
@@ -107,10 +114,12 @@
} catch (IOException e) {
// Since file handle could not be closed, just ignore.
}
+ writeFileHandle = null;
}
}
public synchronized void write(ByteBuffer buffer) throws
HyracksDataException {
+ System.err.println("XXX write1 " + toString());
if (fileRef == null) {
String fName = FILE_PREFIX +
String.valueOf(resultSetPartitionId.getPartition());
fileRef = fileFactory.createUnmanagedWorkspaceFile(fName);
@@ -124,6 +133,7 @@
public synchronized void write(DatasetMemoryManager datasetMemoryManager,
ByteBuffer buffer)
throws HyracksDataException {
+ System.err.println("XXX write2 " + toString());
int srcOffset = 0;
Page destPage = null;
@@ -146,16 +156,20 @@
}
public synchronized void readOpen() {
+ System.err.println("XXX readOpen " + toString());
// It is a noOp for now, leaving here to keep the API stable for
future usage.
}
public synchronized void readClose() throws HyracksDataException {
+ System.err.println("XXX readClose " + toString());
if (readFileHandle != null) {
ioManager.close(readFileHandle);
+ readFileHandle = null;
}
}
public synchronized long read(long offset, ByteBuffer buffer) throws
HyracksDataException {
+ System.err.println("XXX read1 " + toString());
long readSize = 0;
while (offset >= size && !eos.get() && !failed.get()) {
@@ -179,6 +193,7 @@
public long read(DatasetMemoryManager datasetMemoryManager, long offset,
ByteBuffer buffer)
throws HyracksDataException {
+ System.err.println("XXX read2 " + toString());
long readSize = 0;
synchronized (this) {
while (offset >= size && !eos.get() && !failed.get()) {
@@ -220,11 +235,13 @@
}
public synchronized void abort() {
+ System.err.println("XXX abort " + toString());
failed.set(true);
notifyAll();
}
public synchronized Page returnPage() throws HyracksDataException {
+ System.err.println("XXX returnPage " + toString());
Page page = removePage();
// If we do not have any pages to be given back close the write
channel since we don't write any more, return null.
@@ -324,4 +341,16 @@
readFileHandle = ioManager.open(fileRef,
IIOManager.FileReadWriteMode.READ_ONLY,
IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
}
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("{ ");
+
sb.append('"').append("rspid").append("\":\"").append(resultSetPartitionId).append("\",
");
+
sb.append('"').append("async").append("\":").append(asyncMode).append(", ");
+ sb.append('"').append("eos").append("\":").append(eos).append(", ");
+ sb.append('"').append("failed").append("\":").append(failed).append(",
");
+
sb.append('"').append("fileRef").append("\":\"").append(String.valueOf(fileRef)).append("\"
}");
+ return sb.toString();
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
index b422ef4..a8d699e 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
@@ -21,6 +21,7 @@
import java.io.IOException;
import java.io.PrintStream;
import java.nio.ByteBuffer;
+import java.util.logging.Logger;
import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.IFrameWriter;
@@ -42,6 +43,9 @@
import
org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
public class ResultWriterOperatorDescriptor extends
AbstractSingleActivityOperatorDescriptor {
+ private static final Logger LOGGER =
Logger.getLogger(ResultWriterOperatorDescriptor.class.getName());
+
+
private static final long serialVersionUID = 1L;
private final ResultSetId rsId;
@@ -85,6 +89,7 @@
@Override
public void open() throws HyracksDataException {
+ System.err.println("XXX open " + toString());
try {
datasetPartitionWriter =
dpm.createDatasetPartitionWriter(ctx, rsId, ordered, asyncMode, partition,
nPartitions);
@@ -97,6 +102,7 @@
@Override
public void nextFrame(ByteBuffer buffer) throws
HyracksDataException {
+ System.err.println("XXX nextFrame " + toString());
frameTupleAccessor.reset(buffer);
for (int tIndex = 0; tIndex <
frameTupleAccessor.getTupleCount(); tIndex++) {
resultSerializer.appendTuple(frameTupleAccessor, tIndex);
@@ -111,12 +117,14 @@
@Override
public void fail() throws HyracksDataException {
+ System.err.println("XXX fail " + toString());
failed = true;
datasetPartitionWriter.fail();
}
@Override
public void close() throws HyracksDataException {
+ System.err.println("XXX close " + toString());
try {
if (!failed && frameOutputStream.getTupleCount() > 0) {
frameOutputStream.flush(datasetPartitionWriter);
@@ -128,6 +136,16 @@
datasetPartitionWriter.close();
}
}
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("{ ");
+ sb.append("\"rsId\": \"").append(rsId).append("\", ");
+ sb.append("\"ordered\": ").append(ordered).append(", ");
+ sb.append("\"asyncMode\": ").append(asyncMode).append(" }");
+ return sb.toString();
+ }
};
}
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/1548
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I88fe289fe9109ea012c63d82af0083dce6bde31b
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Till Westmann <[email protected]>