Murtadha Hubail has submitted this change and it was merged. Change subject: [ASTERIXDB-1706][RT] Ensure All Result Frames Are Read ......................................................................
[ASTERIXDB-1706][RT] Ensure All Result Frames Are Read - user model changes: no - storage format changes: no - interface changes: yes - Removed IDatasetInputChannelMonitor Details: - Currently there is a possibility that the EOS comes in DatasetInputChannelMonitor right after the check for avaibale frames is performed which will result in missing some result frames from being read. When this happens, empty result will be returned if no frames were read before. This change ensures that the state between checking the avaiable frames and the EOS is consistent. - Clean up HyracksDatasetReader. - Use error code for result read failure. Change-Id: I7d5a78fa20fe200cfffd21a215e052481c6d61ca Reviewed-on: https://asterix-gerrit.ics.uci.edu/2337 Sonar-Qube: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Till Westmann <[email protected]> --- M asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetDirectoryRecord.java D hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetInputChannelMonitor.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java M hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties M hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetReader.java 6 files changed, 121 insertions(+), 176 deletions(-) Approvals: Anon. E. Moose #1000171: Till Westmann: Looks good to me, approved Jenkins: Verified; No violations found; Verified diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml index fe030a8..1ae94ec 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml @@ -52,7 +52,7 @@ <test-case FilePath="async-deferred"> <compilation-unit name="async-exhausted-result"> <output-dir compare="Text">async-exhausted-result</output-dir> - <expected-error>Job Failed</expected-error> + <expected-error>HYR0093</expected-error> </compilation-unit> </test-case> </test-group> diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetDirectoryRecord.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetDirectoryRecord.java index 3165840..e47b1e2 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetDirectoryRecord.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetDirectoryRecord.java @@ -58,7 +58,7 @@ this.empty = empty; } - public boolean getEmpty() { + public boolean isEmpty() { return empty; } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetInputChannelMonitor.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetInputChannelMonitor.java deleted file mode 100644 index bce321f..0000000 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetInputChannelMonitor.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.api.dataset; - -import org.apache.hyracks.api.channels.IInputChannelMonitor; - -public interface IDatasetInputChannelMonitor extends IInputChannelMonitor { - public boolean eosReached(); - - public boolean failed(); - - public int getNFramesAvailable(); - - public void notifyFrameRead(); -} diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java index 35fdb2e..8b47171 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java @@ -126,7 +126,7 @@ public static final int ILLEGAL_MEMORY_BUDGET = 90; public static final int TIMEOUT = 91; public static final int JOB_HAS_BEEN_CLEARED_FROM_HISTORY = 92; - // 93 + public static final int FAILED_TO_READ_RESULT = 93; public static final int CANNOT_READ_CLOSED_FILE = 94; public static final int TUPLE_CANNOT_FIT_INTO_EMPTY_FRAME = 95; public static final int ILLEGAL_ATTEMPT_TO_ENTER_EMPTY_COMPONENT = 96; diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties index 465a661..a27a736 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties @@ -109,7 +109,7 @@ 90 = Memory budget for the %1$s operator (%2$s bytes) is lower than the minimum (%3$s bytes) 91 = Operation timed out 92 = Job %1$s has been cleared from job history -# 93 +93 = Failed to read result for job %1$s 94 = Cannot read closed file (%1$s) 95 = Tuple of size %1$s cannot fit into an empty frame 96 = Illegal attempt to enter empty component diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetReader.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetReader.java index e7c9042..36c77ce 100644 --- a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetReader.java +++ b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetReader.java @@ -23,75 +23,59 @@ import java.net.SocketAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.hyracks.api.channels.IInputChannel; +import org.apache.hyracks.api.channels.IInputChannelMonitor; import org.apache.hyracks.api.comm.FrameHelper; import org.apache.hyracks.api.comm.IFrame; import org.apache.hyracks.api.comm.NetworkAddress; import org.apache.hyracks.api.context.IHyracksCommonContext; import org.apache.hyracks.api.dataset.DatasetDirectoryRecord; import org.apache.hyracks.api.dataset.DatasetJobRecord.Status; -import org.apache.hyracks.api.dataset.IDatasetInputChannelMonitor; import org.apache.hyracks.api.dataset.IHyracksDatasetDirectoryServiceConnection; import org.apache.hyracks.api.dataset.IHyracksDatasetReader; import org.apache.hyracks.api.dataset.ResultSetId; import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.client.net.ClientNetworkManager; import org.apache.hyracks.comm.channels.DatasetNetworkInputChannel; +import org.apache.hyracks.util.annotations.NotThreadSafe; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -// TODO(madhusudancs): Should this implementation be moved to org.apache.hyracks.client? +@NotThreadSafe public class HyracksDatasetReader implements IHyracksDatasetReader { + private static final Logger LOGGER = LogManager.getLogger(); - - private final IHyracksDatasetDirectoryServiceConnection datasetDirectoryServiceConnection; - + private static final int NUM_READ_BUFFERS = 1; + private final IHyracksDatasetDirectoryServiceConnection datasetDirectory; private final ClientNetworkManager netManager; - private final IHyracksCommonContext datasetClientCtx; - - private JobId jobId; - - private ResultSetId resultSetId; - + private final JobId jobId; + private final ResultSetId resultSetId; private DatasetDirectoryRecord[] knownRecords; + private DatasetInputChannelMonitor[] monitors; + private DatasetInputChannelMonitor currentRecordMonitor; + private DatasetNetworkInputChannel currentRecordChannel; + private int currentRecord; - private IDatasetInputChannelMonitor[] monitors; - - private int lastReadPartition; - - private IDatasetInputChannelMonitor lastMonitor; - - private DatasetNetworkInputChannel resultChannel; - - private static int NUM_READ_BUFFERS = 1; - - public HyracksDatasetReader(IHyracksDatasetDirectoryServiceConnection datasetDirectoryServiceConnection, + public HyracksDatasetReader(IHyracksDatasetDirectoryServiceConnection datasetDirectory, ClientNetworkManager netManager, IHyracksCommonContext datasetClientCtx, JobId jobId, - ResultSetId resultSetId) throws Exception { - this.datasetDirectoryServiceConnection = datasetDirectoryServiceConnection; + ResultSetId resultSetId) { + this.datasetDirectory = datasetDirectory; this.netManager = netManager; this.datasetClientCtx = datasetClientCtx; this.jobId = jobId; this.resultSetId = resultSetId; - knownRecords = null; - monitors = null; - lastReadPartition = -1; - lastMonitor = null; - resultChannel = null; + currentRecord = -1; } @Override public Status getResultStatus() { try { - return datasetDirectoryServiceConnection.getDatasetResultStatus(jobId, resultSetId); + return datasetDirectory.getDatasetResultStatus(jobId, resultSetId); } catch (HyracksDataException e) { if (e.getErrorCode() != ErrorCode.NO_RESULT_SET) { LOGGER.log(Level.WARN, "Exception retrieving result set for job " + jobId, e); @@ -102,107 +86,55 @@ return null; } - private DatasetDirectoryRecord getRecord(int partition) throws Exception { - while (knownRecords == null || knownRecords[partition] == null) { - knownRecords = - datasetDirectoryServiceConnection.getDatasetResultLocations(jobId, resultSetId, knownRecords); - } - return knownRecords[partition]; - } - - private boolean nextPartition() throws HyracksDataException { - ++lastReadPartition; - try { - DatasetDirectoryRecord record = getRecord(lastReadPartition); - while (record.getEmpty() && (++lastReadPartition) < knownRecords.length) { - record = getRecord(lastReadPartition); - } - if (lastReadPartition == knownRecords.length) { - return false; - } - resultChannel = new DatasetNetworkInputChannel(netManager, getSocketAddress(record), jobId, resultSetId, - lastReadPartition, NUM_READ_BUFFERS); - lastMonitor = getMonitor(lastReadPartition); - resultChannel.registerMonitor(lastMonitor); - resultChannel.open(datasetClientCtx); - return true; - } catch (Exception e) { - throw HyracksDataException.create(e); - } - } - @Override public int read(IFrame frame) throws HyracksDataException { frame.reset(); - ByteBuffer readBuffer; int readSize = 0; - - if (lastReadPartition == -1) { - if (!nextPartition()) { - return readSize; - } + if (isFirstRead() && !hasNextRecord()) { + return readSize; } - - while (readSize < frame.getFrameSize() - && !((lastReadPartition == knownRecords.length - 1) && isPartitionReadComplete(lastMonitor))) { - waitForNextFrame(lastMonitor); - if (isPartitionReadComplete(lastMonitor)) { - knownRecords[lastReadPartition].readEOS(); - resultChannel.close(); - if ((lastReadPartition == knownRecords.length - 1) || !nextPartition()) { + // read until frame is full or all dataset records have been read + while (readSize < frame.getFrameSize()) { + if (currentRecordMonitor.hasMoreFrames()) { + final ByteBuffer readBuffer = currentRecordChannel.getNextBuffer(); + if (readBuffer == null) { + throw new IllegalStateException("Unexpected empty frame"); + } + currentRecordMonitor.notifyFrameRead(); + if (readSize == 0) { + final int nBlocks = FrameHelper.deserializeNumOfMinFrame(readBuffer); + frame.ensureFrameSize(frame.getMinSize() * nBlocks); + frame.getBuffer().clear(); + } + frame.getBuffer().put(readBuffer); + currentRecordChannel.recycleBuffer(readBuffer); + readSize = frame.getBuffer().position(); + } else { + currentRecordChannel.close(); + if (currentRecordMonitor.failed()) { + throw HyracksDataException.create(ErrorCode.FAILED_TO_READ_RESULT, jobId); + } + if (isLastRecord() || !hasNextRecord()) { break; } - } else { - readBuffer = resultChannel.getNextBuffer(); - lastMonitor.notifyFrameRead(); - if (readBuffer != null) { - if (readSize <= 0) { - int nBlocks = FrameHelper.deserializeNumOfMinFrame(readBuffer); - frame.ensureFrameSize(frame.getMinSize() * nBlocks); - frame.getBuffer().clear(); - frame.getBuffer().put(readBuffer); - resultChannel.recycleBuffer(readBuffer); - readSize = frame.getBuffer().position(); - } else { - frame.getBuffer().put(readBuffer); - resultChannel.recycleBuffer(readBuffer); - readSize = frame.getBuffer().position(); - } - } } } - frame.getBuffer().flip(); return readSize; } - private static void waitForNextFrame(IDatasetInputChannelMonitor monitor) throws HyracksDataException { - synchronized (monitor) { - while (monitor.getNFramesAvailable() <= 0 && !monitor.eosReached() && !monitor.failed()) { - try { - monitor.wait(); - } catch (InterruptedException e) { - throw new HyracksDataException(e); - } - } - } - if (monitor.failed()) { - throw new HyracksDataException("Job Failed."); + private SocketAddress getSocketAddress(DatasetDirectoryRecord record) throws HyracksDataException { + try { + final NetworkAddress netAddr = record.getNetworkAddress(); + return new InetSocketAddress(InetAddress.getByAddress(netAddr.lookupIpAddress()), netAddr.getPort()); + } catch (UnknownHostException e) { + throw HyracksDataException.create(e); } } - private boolean isPartitionReadComplete(IDatasetInputChannelMonitor monitor) { - return (monitor.getNFramesAvailable() <= 0) && (monitor.eosReached()); - } - - private SocketAddress getSocketAddress(DatasetDirectoryRecord addr) throws UnknownHostException { - NetworkAddress netAddr = addr.getNetworkAddress(); - return new InetSocketAddress(InetAddress.getByAddress(netAddr.lookupIpAddress()), netAddr.getPort()); - } - - private IDatasetInputChannelMonitor getMonitor(int partition) throws HyracksException { + private DatasetInputChannelMonitor getMonitor(int partition) { if (knownRecords == null || knownRecords[partition] == null) { - throw new HyracksException("Accessing monitors before the obtaining the corresponding addresses."); + throw new IllegalStateException("Accessing monitors before obtaining the corresponding addresses"); } if (monitors == null) { monitors = new DatasetInputChannelMonitor[knownRecords.length]; @@ -213,56 +145,100 @@ return monitors[partition]; } - private class DatasetInputChannelMonitor implements IDatasetInputChannelMonitor { - private final AtomicInteger nAvailableFrames; + private boolean hasNextRecord() throws HyracksDataException { + currentRecord++; + DatasetDirectoryRecord record = getRecord(currentRecord); + // skip empty records + while (record.isEmpty() && ++currentRecord < knownRecords.length) { + record = getRecord(currentRecord); + } + if (currentRecord == knownRecords.length) { + // exhausted all known records + return false; + } + requestRecordData(record); + return true; + } - private final AtomicBoolean eos; + private DatasetDirectoryRecord getRecord(int recordNum) throws HyracksDataException { + try { + while (knownRecords == null || knownRecords[recordNum] == null) { + knownRecords = datasetDirectory.getDatasetResultLocations(jobId, resultSetId, knownRecords); + } + return knownRecords[recordNum]; + } catch (Exception e) { + throw HyracksDataException.create(e); + } + } - private final AtomicBoolean failed; + private void requestRecordData(DatasetDirectoryRecord record) throws HyracksDataException { + currentRecordChannel = new DatasetNetworkInputChannel(netManager, getSocketAddress(record), jobId, resultSetId, + currentRecord, NUM_READ_BUFFERS); + currentRecordMonitor = getMonitor(currentRecord); + currentRecordChannel.registerMonitor(currentRecordMonitor); + currentRecordChannel.open(datasetClientCtx); + } - public DatasetInputChannelMonitor() { - nAvailableFrames = new AtomicInteger(0); - eos = new AtomicBoolean(false); - failed = new AtomicBoolean(false); + private boolean isFirstRead() { + return currentRecord == -1; + } + + private boolean isLastRecord() { + return knownRecords != null && currentRecord == knownRecords.length - 1; + } + + private static class DatasetInputChannelMonitor implements IInputChannelMonitor { + + private int availableFrames; + private boolean eos; + private boolean failed; + + DatasetInputChannelMonitor() { + eos = false; + failed = false; } @Override public synchronized void notifyFailure(IInputChannel channel) { - failed.set(true); + failed = true; notifyAll(); } @Override public synchronized void notifyDataAvailability(IInputChannel channel, int nFrames) { - nAvailableFrames.addAndGet(nFrames); + availableFrames += nFrames; notifyAll(); } @Override public synchronized void notifyEndOfStream(IInputChannel channel) { - eos.set(true); + eos = true; notifyAll(); } - @Override - public synchronized boolean eosReached() { - return eos.get(); + synchronized boolean failed() { + return failed; } - @Override - public synchronized boolean failed() { - return failed.get(); + synchronized void notifyFrameRead() { + availableFrames--; + notifyAll(); } - @Override - public synchronized int getNFramesAvailable() { - return nAvailableFrames.get(); + synchronized boolean hasMoreFrames() throws HyracksDataException { + while (!failed && !eos && availableFrames == 0) { + try { + wait(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw HyracksDataException.create(e); + } + } + return !failed && !isFullyConsumed(); } - @Override - public synchronized void notifyFrameRead() { - nAvailableFrames.decrementAndGet(); + private synchronized boolean isFullyConsumed() { + return availableFrames == 0 && eos; } - } } -- To view, visit https://asterix-gerrit.ics.uci.edu/2337 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I7d5a78fa20fe200cfffd21a215e052481c6d61ca Gerrit-PatchSet: 5 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]>
