abdullah alamoudi has submitted this change and it was merged. Change subject: [NO ISSUE][RT] Abort tasks on local network failures ......................................................................
[NO ISSUE][RT] Abort tasks on local network failures - user model changes: no - storage format changes: no - interface changes: yes Add error code to IInputChannelMonitor.notifyFailure Details: - Previously, there was an assumption that all failures reported to an IInputChannelMonitor come from a remote task. - This assumption is not always true and could lead to jobs hanging. - To fix this, we report an error code indicating whether the failure is local or remote and if the failure is local then we fail the local task and report the failure to cc. Change-Id: I7ea5b9008383faaac7c563671242b03919090b35 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2806 Sonar-Qube: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> Tested-by: Jenkins <[email protected]> Contrib: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> --- M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/channels/IInputChannelMonitor.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java M hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties M hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetReader.java M hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/DatasetNetworkInputChannel.java M hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java M hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/pom.xml M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java M hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelReadInterface.java M hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java M hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java 15 files changed, 54 insertions(+), 26 deletions(-) Approvals: Jenkins: Verified; No violations found; ; Verified Murtadha Hubail: Looks good to me, approved diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/channels/IInputChannelMonitor.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/channels/IInputChannelMonitor.java index 52509d3..559f49a 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/channels/IInputChannelMonitor.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/channels/IInputChannelMonitor.java @@ -19,9 +19,9 @@ package org.apache.hyracks.api.channels; public interface IInputChannelMonitor { - public void notifyFailure(IInputChannel channel); + void notifyFailure(IInputChannel channel, int errorCode); - public void notifyDataAvailability(IInputChannel channel, int nFrames); + void notifyDataAvailability(IInputChannel channel, int nFrames); - public void notifyEndOfStream(IInputChannel channel); + void notifyEndOfStream(IInputChannel channel); } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java index b6d7cc7..09193d9 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java @@ -148,6 +148,7 @@ public static final int CANNOT_ADD_ELEMENT_TO_INVERTED_INDEX_SEARCH_RESULT = 112; public static final int UNDEFINED_INVERTED_LIST_MERGE_TYPE = 113; public static final int NODE_IS_NOT_ACTIVE = 114; + public static final int LOCAL_NETWORK_ERROR = 115; // Compilation error codes. public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000; diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties index bf73b91..c704d7e 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties @@ -131,6 +131,7 @@ 112 = Cannot add an element to an inverted-index search result. 113 = Undefined inverted-list merge type: %1$s 114 = Node (%1$s) is not active +115 = Local network error 10000 = The given rule collection %1$s is not an instance of the List class. 10001 = Cannot compose partition constraint %1$s with %2$s diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetReader.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetReader.java index 36c77ce..b1566f4 100644 --- a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetReader.java +++ b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetReader.java @@ -199,7 +199,7 @@ } @Override - public synchronized void notifyFailure(IInputChannel channel) { + public synchronized void notifyFailure(IInputChannel channel, int errorCode) { failed = true; notifyAll(); } diff --git a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/DatasetNetworkInputChannel.java b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/DatasetNetworkInputChannel.java index 44c3d36..0f96a6e 100644 --- a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/DatasetNetworkInputChannel.java +++ b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/DatasetNetworkInputChannel.java @@ -141,7 +141,7 @@ @Override public void error(int ecode) { - monitor.notifyFailure(DatasetNetworkInputChannel.this); + monitor.notifyFailure(DatasetNetworkInputChannel.this, ecode); } } diff --git a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java index a831492..7e893f8 100644 --- a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java +++ b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java @@ -133,7 +133,7 @@ @Override public void error(int ecode) { - monitor.notifyFailure(NetworkInputChannel.this); + monitor.notifyFailure(NetworkInputChannel.this, ecode); } } diff --git a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java index 3016a7a..8bee56e 100644 --- a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java +++ b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java @@ -96,7 +96,7 @@ @Override public void fail() throws HyracksDataException { - ccb.getWriteInterface().getFullBufferAcceptor().error(AbstractChannelWriteInterface.REMOTE_WRITE_ERROR_CODE); + ccb.getWriteInterface().getFullBufferAcceptor().error(AbstractChannelWriteInterface.REMOTE_ERROR_CODE); } @Override @@ -105,7 +105,7 @@ } public void abort() { - ccb.getWriteInterface().getFullBufferAcceptor().error(AbstractChannelWriteInterface.REMOTE_WRITE_ERROR_CODE); + ccb.getWriteInterface().getFullBufferAcceptor().error(AbstractChannelWriteInterface.REMOTE_ERROR_CODE); synchronized (NetworkOutputChannel.this) { aborted = true; NetworkOutputChannel.this.notifyAll(); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java index 23a5abb..340924d 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java @@ -309,7 +309,8 @@ if (!addPendingThread(thread)) { return; } - thread.setName(displayName + ":" + taskAttemptId + ":" + cIdx); + thread.setName( + displayName + ":" + joblet.getJobId() + ":" + taskAttemptId + ":" + cIdx); thread.setPriority(Thread.MIN_PRIORITY); try { pushFrames(collector, inputChannelsFromConnectors.get(cIdx), writer); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java index c902ad8..5ffed7e 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java @@ -103,7 +103,7 @@ } @Override - public synchronized void notifyFailure(IInputChannel channel) { + public synchronized void notifyFailure(IInputChannel channel, int errorCode) { failed.set(true); notifyAll(); } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/pom.xml b/hyracks-fullstack/hyracks/hyracks-dataflow-std/pom.xml index c78c98c..0ff425f 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/pom.xml +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/pom.xml @@ -76,6 +76,11 @@ <version>${project.version}</version> </dependency> <dependency> + <groupId>org.apache.hyracks</groupId> + <artifactId>hyracks-net</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> <groupId>com.e-movimento.tinytools</groupId> <artifactId>privilegedaccessor</artifactId> <version>1.2.2</version> diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java index bf9f575..5ce29c2 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java @@ -25,8 +25,10 @@ import org.apache.hyracks.api.comm.FrameHelper; import org.apache.hyracks.api.comm.IFrame; import org.apache.hyracks.api.comm.IFrameReader; +import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; +import org.apache.hyracks.net.protocols.muxdemux.AbstractChannelWriteInterface; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -38,28 +40,35 @@ private boolean eos; - private boolean failed; + private int errorCode; public InputChannelFrameReader(IInputChannel channel) { this.channel = channel; availableFrames = 0; + errorCode = AbstractChannelWriteInterface.NO_ERROR_CODE; eos = false; - failed = false; } @Override public void open() throws HyracksDataException { } + private boolean hasFailed() { + return errorCode != AbstractChannelWriteInterface.NO_ERROR_CODE; + } + private synchronized boolean canGetNextBuffer() throws HyracksDataException { - while (!failed && !eos && availableFrames <= 0) { + while (!hasFailed() && !eos && availableFrames <= 0) { try { wait(); } catch (InterruptedException e) { throw HyracksDataException.create(e); } } - if (failed) { + if (hasFailed()) { + if (errorCode == AbstractChannelWriteInterface.LOCAL_ERROR_CODE) { + throw HyracksDataException.create(ErrorCode.LOCAL_NETWORK_ERROR); + } // Do not throw exception here to allow the root cause exception gets propagated to the master first. // Return false to allow the nextFrame(...) call to be a non-op. LOGGER.warn("Sender failed.. returning silently"); @@ -96,8 +105,7 @@ for (int i = 1; i < nBlocks; ++i) { if (!canGetNextBuffer()) { - throw new HyracksDataException( - "InputChannelReader is waiting for the new frames, but the input stream is finished"); + return false; } srcFrame = channel.getNextBuffer(); frame.getBuffer().put(srcFrame); @@ -116,8 +124,11 @@ } @Override - public synchronized void notifyFailure(IInputChannel channel) { - failed = true; + public synchronized void notifyFailure(IInputChannel channel, int errorCode) { + // Note: if a remote failure overwrites the value of localFailure, then we rely on + // the fact that the remote task will notify the cc of the failure. + // Otherwise, the local task must fail + this.errorCode = errorCode; notifyAll(); } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java index 3c0a06b..5a1d5f8 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/NonDeterministicChannelReader.java @@ -23,8 +23,10 @@ import org.apache.hyracks.api.channels.IInputChannel; import org.apache.hyracks.api.channels.IInputChannelMonitor; +import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.partitions.PartitionId; +import org.apache.hyracks.net.protocols.muxdemux.AbstractChannelWriteInterface; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -46,6 +48,8 @@ private final BitSet closedSenders; private int lastReadSender; + + private boolean localFailure; public NonDeterministicChannelReader(int nSenderPartitions, BitSet expectedPartitions) { this.nSenderPartitions = nSenderPartitions; @@ -107,6 +111,9 @@ } if (!failSenders.isEmpty()) { LOGGER.warn("Sender failed.. returning silently"); + if (localFailure) { + throw HyracksDataException.create(ErrorCode.LOCAL_NETWORK_ERROR); + } // Do not throw exception here to allow the root cause exception gets propagated to the master first. // Return a negative value to allow the nextFrame(...) call to be a non-op. return -1; @@ -141,11 +148,15 @@ } @Override - public synchronized void notifyFailure(IInputChannel channel) { + public synchronized void notifyFailure(IInputChannel channel, int errorCode) { PartitionId pid = (PartitionId) channel.getAttachment(); int senderIndex = pid.getSenderIndex(); LOGGER.warn("Failure: " + pid.getConnectorDescriptorId() + " sender: " + senderIndex + " receiver: " + pid.getReceiverIndex()); + // Note: if a remote failure overwrites the value of localFailure, then we rely on + // the fact that the remote task will notify the cc of the failure. + // Otherwise, the local task must fail + localFailure = errorCode == AbstractChannelWriteInterface.LOCAL_ERROR_CODE; failSenders.set(senderIndex); eosSenders.set(senderIndex); notifyAll(); diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelReadInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelReadInterface.java index ff8d451..31cb69f 100644 --- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelReadInterface.java +++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelReadInterface.java @@ -42,10 +42,6 @@ } } - public void reportError(int ecode) { - fba.error(ecode); - } - @Override public void setFullBufferAcceptor(ICloseableBufferAcceptor fullBufferAcceptor) { fba = fullBufferAcceptor; diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java index 28c1a71..7ed9bfa 100644 --- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java +++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java @@ -31,7 +31,9 @@ public abstract class AbstractChannelWriteInterface implements IChannelWriteInterface { - public static final int REMOTE_WRITE_ERROR_CODE = 1; + public static final int NO_ERROR_CODE = 0; + public static final int REMOTE_ERROR_CODE = 1; + public static final int LOCAL_ERROR_CODE = -1; private static final Logger LOGGER = LogManager.getLogger(); protected final IChannelControlBlock ccb; protected final Queue<ByteBuffer> wiFullQueue; @@ -136,7 +138,7 @@ return; } eos = true; - if (ecode != REMOTE_WRITE_ERROR_CODE) { + if (ecode != REMOTE_ERROR_CODE) { adjustChannelWritability(); } } diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java index c2fb9a8..49b9f7a 100644 --- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java +++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java @@ -200,7 +200,7 @@ for (int i = 0; i < ccbArray.length; ++i) { ChannelControlBlock ccb = ccbArray[i]; if (ccb != null) { - ccb.reportRemoteError(-1); + ccb.reportRemoteError(AbstractChannelWriteInterface.LOCAL_ERROR_CODE); markEOSAck(i); unmarkPendingCredits(i); } -- To view, visit https://asterix-gerrit.ics.uci.edu/2806 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I7ea5b9008383faaac7c563671242b03919090b35 Gerrit-PatchSet: 5 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
