NicoK closed pull request #6470: [FLINK-10006][network] improve logging in BarrierBuffer: prepend owning task name URL: https://github.com/apache/flink/pull/6470
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java index 0413caa8aec..c78abb5165a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java @@ -69,6 +69,8 @@ int getNumberOfInputChannels(); + String getOwningTaskName(); + boolean isFinished(); void requestPartitions() throws IOException, InterruptedException; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java index 06e80ff531a..2e7d076f3f8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java @@ -62,10 +62,10 @@ /** * An input gate consumes one or more partitions of a single produced intermediate result. * - * <p> Each intermediate result is partitioned over its producing parallel subtasks; each of these + * <p>Each intermediate result is partitioned over its producing parallel subtasks; each of these * partitions is furthermore partitioned into one or more subpartitions. * - * <p> As an example, consider a map-reduce program, where the map operator produces data and the + * <p>As an example, consider a map-reduce program, where the map operator produces data and the * reduce operator consumes the produced data. * * <pre>{@code @@ -74,7 +74,7 @@ * +-----+ +---------------------+ +--------+ * }</pre> * - * <p> When deploying such a program in parallel, the intermediate result will be partitioned over its + * <p>When deploying such a program in parallel, the intermediate result will be partitioned over its * producing parallel subtasks; each of these partitions is furthermore partitioned into one or more * subpartitions. * @@ -95,7 +95,7 @@ * +-----------------------------------------+ * }</pre> * - * <p> In the above example, two map subtasks produce the intermediate result in parallel, resulting + * <p>In the above example, two map subtasks produce the intermediate result in parallel, resulting * in two partitions (Partition 1 and 2). Each of these partitions is further partitioned into two * subpartitions -- one for each parallel reduce subtask. */ @@ -274,6 +274,11 @@ public int getNumberOfQueuedBuffers() { return 0; } + @Override + public String getOwningTaskName() { + return owningTaskName; + } + // ------------------------------------------------------------------------ // Setup/Life-cycle // ------------------------------------------------------------------------ @@ -364,7 +369,7 @@ else if (partitionLocation.isRemote()) { throw new IllegalStateException("Tried to update unknown channel with unknown channel."); } - LOG.debug("Updated unknown input channel to {}.", newChannel); + LOG.debug("{}: Updated unknown input channel to {}.", owningTaskName, newChannel); inputChannels.put(partitionId, newChannel); @@ -393,7 +398,7 @@ public void retriggerPartitionRequest(IntermediateResultPartitionID partitionId) checkNotNull(ch, "Unknown input channel with ID " + partitionId); - LOG.debug("Retriggering partition request {}:{}.", ch.partitionId, consumedSubpartitionIndex); + LOG.debug("{}: Retriggering partition request {}:{}.", owningTaskName, ch.partitionId, consumedSubpartitionIndex); if (ch.getClass() == RemoteInputChannel.class) { final RemoteInputChannel rch = (RemoteInputChannel) ch; @@ -432,7 +437,8 @@ public void releaseAllResources() throws IOException { inputChannel.releaseAllResources(); } catch (IOException e) { - LOG.warn("Error during release of channel resources: " + e.getMessage(), e); + LOG.warn("{}: Error during release of channel resources: {}.", + owningTaskName, e.getMessage(), e); } } @@ -725,7 +731,8 @@ else if (partitionLocation.isUnknown()) { inputGate.setInputChannel(partitionId.getPartitionId(), inputChannels[i]); } - LOG.debug("Created {} input channels (local: {}, remote: {}, unknown: {}).", + LOG.debug("{}: Created {} input channels (local: {}, remote: {}, unknown: {}).", + owningTaskName, inputChannels.length, numLocalChannels, numRemoteChannels, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java index 742592a93f0..d3085cb5279 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java @@ -129,6 +129,12 @@ public int getNumberOfInputChannels() { return totalNumberOfInputChannels; } + @Override + public String getOwningTaskName() { + // all input gates have the same owning task + return inputGates[0].getOwningTaskName(); + } + @Override public boolean isFinished() { for (InputGate inputGate : inputGates) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java index 78852b82fdc..991635a9205 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java @@ -213,7 +213,7 @@ else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) { } private void completeBufferedSequence() throws IOException { - LOG.debug("Finished feeding back buffered data"); + LOG.debug("{}: Finished feeding back buffered data.", inputGate.getOwningTaskName()); currentBuffered.cleanup(); currentBuffered = queuedBuffered.pollFirst(); @@ -247,8 +247,11 @@ private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) } else if (barrierId > currentCheckpointId) { // we did not complete the current checkpoint, another started before - LOG.warn("Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " + - "Skipping current checkpoint.", barrierId, currentCheckpointId); + LOG.warn("{}: Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " + + "Skipping current checkpoint.", + inputGate.getOwningTaskName(), + barrierId, + currentCheckpointId); // let the task know we are not completing this notifyAbort(currentCheckpointId, new CheckpointDeclineSubsumedException(barrierId)); @@ -279,8 +282,10 @@ else if (barrierId > currentCheckpointId) { if (numBarriersReceived + numClosedChannels == totalNumberOfInputChannels) { // actually trigger checkpoint if (LOG.isDebugEnabled()) { - LOG.debug("Received all barriers, triggering checkpoint {} at {}", - receivedBarrier.getId(), receivedBarrier.getTimestamp()); + LOG.debug("{}: Received all barriers, triggering checkpoint {} at {}.", + inputGate.getOwningTaskName(), + receivedBarrier.getId(), + receivedBarrier.getTimestamp()); } releaseBlocksAndResetBarriers(); @@ -309,7 +314,9 @@ private void processCancellationBarrier(CancelCheckpointMarker cancelBarrier) th if (barrierId == currentCheckpointId) { // cancel this alignment if (LOG.isDebugEnabled()) { - LOG.debug("Checkpoint {} canceled, aborting alignment", barrierId); + LOG.debug("{}: Checkpoint {} canceled, aborting alignment.", + inputGate.getOwningTaskName(), + barrierId); } releaseBlocksAndResetBarriers(); @@ -317,8 +324,11 @@ private void processCancellationBarrier(CancelCheckpointMarker cancelBarrier) th } else if (barrierId > currentCheckpointId) { // we canceled the next which also cancels the current - LOG.warn("Received cancellation barrier for checkpoint {} before completing current checkpoint {}. " + - "Skipping current checkpoint.", barrierId, currentCheckpointId); + LOG.warn("{}: Received cancellation barrier for checkpoint {} before completing current checkpoint {}. " + + "Skipping current checkpoint.", + inputGate.getOwningTaskName(), + barrierId, + currentCheckpointId); // this stops the current alignment releaseBlocksAndResetBarriers(); @@ -347,7 +357,9 @@ else if (barrierId > currentCheckpointId) { latestAlignmentDurationNanos = 0L; if (LOG.isDebugEnabled()) { - LOG.debug("Checkpoint {} canceled, skipping alignment", barrierId); + LOG.debug("{}: Checkpoint {} canceled, skipping alignment.", + inputGate.getOwningTaskName(), + barrierId); } notifyAbortOnCancellationBarrier(barrierId); @@ -401,8 +413,10 @@ private void notifyAbort(long checkpointId, CheckpointDeclineException cause) th private void checkSizeLimit() throws Exception { if (maxBufferedBytes > 0 && (numQueuedBytes + bufferBlocker.getBytesBlocked()) > maxBufferedBytes) { // exceeded our limit - abort this checkpoint - LOG.info("Checkpoint {} aborted because alignment volume limit ({} bytes) exceeded", - currentCheckpointId, maxBufferedBytes); + LOG.info("{}: Checkpoint {} aborted because alignment volume limit ({} bytes) exceeded.", + inputGate.getOwningTaskName(), + currentCheckpointId, + maxBufferedBytes); releaseBlocksAndResetBarriers(); notifyAbort(currentCheckpointId, new AlignmentLimitExceededException(maxBufferedBytes)); @@ -444,7 +458,9 @@ private void beginNewAlignment(long checkpointId, int channelIndex) throws IOExc startOfAlignmentTimestamp = System.nanoTime(); if (LOG.isDebugEnabled()) { - LOG.debug("Starting stream alignment for checkpoint " + checkpointId + '.'); + LOG.debug("{}: Starting stream alignment for checkpoint {}.", + inputGate.getOwningTaskName(), + checkpointId); } } @@ -470,7 +486,9 @@ private void onBarrier(int channelIndex) throws IOException { numBarriersReceived++; if (LOG.isDebugEnabled()) { - LOG.debug("Received barrier from channel " + channelIndex); + LOG.debug("{}: Received barrier from channel {}.", + inputGate.getOwningTaskName(), + channelIndex); } } else { @@ -483,7 +501,8 @@ private void onBarrier(int channelIndex) throws IOException { * Makes sure the just written data is the next to be consumed. */ private void releaseBlocksAndResetBarriers() throws IOException { - LOG.debug("End of stream alignment, feeding buffered data back"); + LOG.debug("{}: End of stream alignment, feeding buffered data back.", + inputGate.getOwningTaskName()); for (int i = 0; i < blockedChannels.length; i++) { blockedChannels[i] = false; @@ -499,8 +518,9 @@ private void releaseBlocksAndResetBarriers() throws IOException { else { // uncommon case: buffered data pending // push back the pending data, if we have any - LOG.debug("Checkpoint skipped via buffered data:" + - "Pushing back current alignment buffers and feeding back new alignment data first."); + LOG.debug("{}: Checkpoint skipped via buffered data:" + + "Pushing back current alignment buffers and feeding back new alignment data first.", + inputGate.getOwningTaskName()); // since we did not fully drain the previous sequence, we need to allocate a new buffer for this one BufferOrEventSequence bufferedNow = bufferBlocker.rollOverWithoutReusingResources(); @@ -513,8 +533,9 @@ private void releaseBlocksAndResetBarriers() throws IOException { } if (LOG.isDebugEnabled()) { - LOG.debug("Size of buffered data: {} bytes", - currentBuffered == null ? 0L : currentBuffered.size()); + LOG.debug("{}: Size of buffered data: {} bytes", + inputGate.getOwningTaskName(), + currentBuffered == null ? 0L : currentBuffered.size()); } // the next barrier that comes must assume it is the first @@ -555,7 +576,10 @@ public long getAlignmentDurationNanos() { @Override public String toString() { - return String.format("last checkpoint: %d, current barriers: %d, closed channels: %d", - currentCheckpointId, numBarriersReceived, numClosedChannels); + return String.format("%s: last checkpoint: %d, current barriers: %d, closed channels: %d", + inputGate.getOwningTaskName(), + currentCheckpointId, + numBarriersReceived, + numClosedChannels); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java index 6dd1e5ef2d6..e968101ca2d 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java @@ -139,11 +139,18 @@ public boolean isNextBarrier() { private int currentChannel = 0; private long c = 0; + private final String owningTaskName; + public RandomGeneratingInputGate(BufferPool[] bufferPools, BarrierGenerator[] barrierGens) { + this(bufferPools, barrierGens, "TestTask"); + } + + public RandomGeneratingInputGate(BufferPool[] bufferPools, BarrierGenerator[] barrierGens, String owningTaskName) { this.numChannels = bufferPools.length; this.currentBarriers = new int[numChannels]; this.bufferPools = bufferPools; this.barrierGens = barrierGens; + this.owningTaskName = owningTaskName; } @Override @@ -151,6 +158,11 @@ public int getNumberOfInputChannels() { return numChannels; } + @Override + public String getOwningTaskName() { + return owningTaskName; + } + @Override public boolean isFinished() { return false; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java index e62b709419f..6400a175e20 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java @@ -44,11 +44,18 @@ private int closedChannels; + private final String owningTaskName; + public MockInputGate(int pageSize, int numChannels, List<BufferOrEvent> bufferOrEvents) { + this(pageSize, numChannels, bufferOrEvents, "MockTask"); + } + + public MockInputGate(int pageSize, int numChannels, List<BufferOrEvent> bufferOrEvents, String owningTaskName) { this.pageSize = pageSize; this.numChannels = numChannels; this.bufferOrEvents = new ArrayDeque<BufferOrEvent>(bufferOrEvents); this.closed = new boolean[numChannels]; + this.owningTaskName = owningTaskName; } @Override @@ -61,6 +68,11 @@ public int getNumberOfInputChannels() { return numChannels; } + @Override + public String getOwningTaskName() { + return owningTaskName; + } + @Override public boolean isFinished() { return bufferOrEvents.isEmpty(); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services