NicoK closed pull request #6470: [FLINK-10006][network] improve logging in 
BarrierBuffer: prepend owning task name
URL: https://github.com/apache/flink/pull/6470
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
index 0413caa8aec..c78abb5165a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
@@ -69,6 +69,8 @@
 
        int getNumberOfInputChannels();
 
+       String getOwningTaskName();
+
        boolean isFinished();
 
        void requestPartitions() throws IOException, InterruptedException;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 06e80ff531a..2e7d076f3f8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -62,10 +62,10 @@
 /**
  * An input gate consumes one or more partitions of a single produced 
intermediate result.
  *
- * <p> Each intermediate result is partitioned over its producing parallel 
subtasks; each of these
+ * <p>Each intermediate result is partitioned over its producing parallel 
subtasks; each of these
  * partitions is furthermore partitioned into one or more subpartitions.
  *
- * <p> As an example, consider a map-reduce program, where the map operator 
produces data and the
+ * <p>As an example, consider a map-reduce program, where the map operator 
produces data and the
  * reduce operator consumes the produced data.
  *
  * <pre>{@code
@@ -74,7 +74,7 @@
  * +-----+              +---------------------+              +--------+
  * }</pre>
  *
- * <p> When deploying such a program in parallel, the intermediate result will 
be partitioned over its
+ * <p>When deploying such a program in parallel, the intermediate result will 
be partitioned over its
  * producing parallel subtasks; each of these partitions is furthermore 
partitioned into one or more
  * subpartitions.
  *
@@ -95,7 +95,7 @@
  *               +-----------------------------------------+
  * }</pre>
  *
- * <p> In the above example, two map subtasks produce the intermediate result 
in parallel, resulting
+ * <p>In the above example, two map subtasks produce the intermediate result 
in parallel, resulting
  * in two partitions (Partition 1 and 2). Each of these partitions is further 
partitioned into two
  * subpartitions -- one for each parallel reduce subtask.
  */
@@ -274,6 +274,11 @@ public int getNumberOfQueuedBuffers() {
                return 0;
        }
 
+       @Override
+       public String getOwningTaskName() {
+               return owningTaskName;
+       }
+
        // 
------------------------------------------------------------------------
        // Setup/Life-cycle
        // 
------------------------------------------------------------------------
@@ -364,7 +369,7 @@ else if (partitionLocation.isRemote()) {
                                        throw new IllegalStateException("Tried 
to update unknown channel with unknown channel.");
                                }
 
-                               LOG.debug("Updated unknown input channel to 
{}.", newChannel);
+                               LOG.debug("{}: Updated unknown input channel to 
{}.", owningTaskName, newChannel);
 
                                inputChannels.put(partitionId, newChannel);
 
@@ -393,7 +398,7 @@ public void 
retriggerPartitionRequest(IntermediateResultPartitionID partitionId)
 
                                checkNotNull(ch, "Unknown input channel with ID 
" + partitionId);
 
-                               LOG.debug("Retriggering partition request 
{}:{}.", ch.partitionId, consumedSubpartitionIndex);
+                               LOG.debug("{}: Retriggering partition request 
{}:{}.", owningTaskName, ch.partitionId, consumedSubpartitionIndex);
 
                                if (ch.getClass() == RemoteInputChannel.class) {
                                        final RemoteInputChannel rch = 
(RemoteInputChannel) ch;
@@ -432,7 +437,8 @@ public void releaseAllResources() throws IOException {
                                                        
inputChannel.releaseAllResources();
                                                }
                                                catch (IOException e) {
-                                                       LOG.warn("Error during 
release of channel resources: " + e.getMessage(), e);
+                                                       LOG.warn("{}: Error 
during release of channel resources: {}.",
+                                                               owningTaskName, 
e.getMessage(), e);
                                                }
                                        }
 
@@ -725,7 +731,8 @@ else if (partitionLocation.isUnknown()) {
                        inputGate.setInputChannel(partitionId.getPartitionId(), 
inputChannels[i]);
                }
 
-               LOG.debug("Created {} input channels (local: {}, remote: {}, 
unknown: {}).",
+               LOG.debug("{}: Created {} input channels (local: {}, remote: 
{}, unknown: {}).",
+                       owningTaskName,
                        inputChannels.length,
                        numLocalChannels,
                        numRemoteChannels,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index 742592a93f0..d3085cb5279 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -129,6 +129,12 @@ public int getNumberOfInputChannels() {
                return totalNumberOfInputChannels;
        }
 
+       @Override
+       public String getOwningTaskName() {
+               // all input gates have the same owning task
+               return inputGates[0].getOwningTaskName();
+       }
+
        @Override
        public boolean isFinished() {
                for (InputGate inputGate : inputGates) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index 78852b82fdc..991635a9205 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -213,7 +213,7 @@ else if (bufferOrEvent.getEvent().getClass() == 
CancelCheckpointMarker.class) {
        }
 
        private void completeBufferedSequence() throws IOException {
-               LOG.debug("Finished feeding back buffered data");
+               LOG.debug("{}: Finished feeding back buffered data.", 
inputGate.getOwningTaskName());
 
                currentBuffered.cleanup();
                currentBuffered = queuedBuffered.pollFirst();
@@ -247,8 +247,11 @@ private void processBarrier(CheckpointBarrier 
receivedBarrier, int channelIndex)
                        }
                        else if (barrierId > currentCheckpointId) {
                                // we did not complete the current checkpoint, 
another started before
-                               LOG.warn("Received checkpoint barrier for 
checkpoint {} before completing current checkpoint {}. " +
-                                               "Skipping current checkpoint.", 
barrierId, currentCheckpointId);
+                               LOG.warn("{}: Received checkpoint barrier for 
checkpoint {} before completing current checkpoint {}. " +
+                                               "Skipping current checkpoint.",
+                                       inputGate.getOwningTaskName(),
+                                       barrierId,
+                                       currentCheckpointId);
 
                                // let the task know we are not completing this
                                notifyAbort(currentCheckpointId, new 
CheckpointDeclineSubsumedException(barrierId));
@@ -279,8 +282,10 @@ else if (barrierId > currentCheckpointId) {
                if (numBarriersReceived + numClosedChannels == 
totalNumberOfInputChannels) {
                        // actually trigger checkpoint
                        if (LOG.isDebugEnabled()) {
-                               LOG.debug("Received all barriers, triggering 
checkpoint {} at {}",
-                                               receivedBarrier.getId(), 
receivedBarrier.getTimestamp());
+                               LOG.debug("{}: Received all barriers, 
triggering checkpoint {} at {}.",
+                                       inputGate.getOwningTaskName(),
+                                       receivedBarrier.getId(),
+                                       receivedBarrier.getTimestamp());
                        }
 
                        releaseBlocksAndResetBarriers();
@@ -309,7 +314,9 @@ private void 
processCancellationBarrier(CancelCheckpointMarker cancelBarrier) th
                        if (barrierId == currentCheckpointId) {
                                // cancel this alignment
                                if (LOG.isDebugEnabled()) {
-                                       LOG.debug("Checkpoint {} canceled, 
aborting alignment", barrierId);
+                                       LOG.debug("{}: Checkpoint {} canceled, 
aborting alignment.",
+                                               inputGate.getOwningTaskName(),
+                                               barrierId);
                                }
 
                                releaseBlocksAndResetBarriers();
@@ -317,8 +324,11 @@ private void 
processCancellationBarrier(CancelCheckpointMarker cancelBarrier) th
                        }
                        else if (barrierId > currentCheckpointId) {
                                // we canceled the next which also cancels the 
current
-                               LOG.warn("Received cancellation barrier for 
checkpoint {} before completing current checkpoint {}. " +
-                                               "Skipping current checkpoint.", 
barrierId, currentCheckpointId);
+                               LOG.warn("{}: Received cancellation barrier for 
checkpoint {} before completing current checkpoint {}. " +
+                                               "Skipping current checkpoint.",
+                                       inputGate.getOwningTaskName(),
+                                       barrierId,
+                                       currentCheckpointId);
 
                                // this stops the current alignment
                                releaseBlocksAndResetBarriers();
@@ -347,7 +357,9 @@ else if (barrierId > currentCheckpointId) {
                        latestAlignmentDurationNanos = 0L;
 
                        if (LOG.isDebugEnabled()) {
-                               LOG.debug("Checkpoint {} canceled, skipping 
alignment", barrierId);
+                               LOG.debug("{}: Checkpoint {} canceled, skipping 
alignment.",
+                                       inputGate.getOwningTaskName(),
+                                       barrierId);
                        }
 
                        notifyAbortOnCancellationBarrier(barrierId);
@@ -401,8 +413,10 @@ private void notifyAbort(long checkpointId, 
CheckpointDeclineException cause) th
        private void checkSizeLimit() throws Exception {
                if (maxBufferedBytes > 0 && (numQueuedBytes + 
bufferBlocker.getBytesBlocked()) > maxBufferedBytes) {
                        // exceeded our limit - abort this checkpoint
-                       LOG.info("Checkpoint {} aborted because alignment 
volume limit ({} bytes) exceeded",
-                                       currentCheckpointId, maxBufferedBytes);
+                       LOG.info("{}: Checkpoint {} aborted because alignment 
volume limit ({} bytes) exceeded.",
+                               inputGate.getOwningTaskName(),
+                               currentCheckpointId,
+                               maxBufferedBytes);
 
                        releaseBlocksAndResetBarriers();
                        notifyAbort(currentCheckpointId, new 
AlignmentLimitExceededException(maxBufferedBytes));
@@ -444,7 +458,9 @@ private void beginNewAlignment(long checkpointId, int 
channelIndex) throws IOExc
                startOfAlignmentTimestamp = System.nanoTime();
 
                if (LOG.isDebugEnabled()) {
-                       LOG.debug("Starting stream alignment for checkpoint " + 
checkpointId + '.');
+                       LOG.debug("{}: Starting stream alignment for checkpoint 
{}.",
+                               inputGate.getOwningTaskName(),
+                               checkpointId);
                }
        }
 
@@ -470,7 +486,9 @@ private void onBarrier(int channelIndex) throws IOException 
{
                        numBarriersReceived++;
 
                        if (LOG.isDebugEnabled()) {
-                               LOG.debug("Received barrier from channel " + 
channelIndex);
+                               LOG.debug("{}: Received barrier from channel 
{}.",
+                                       inputGate.getOwningTaskName(),
+                                       channelIndex);
                        }
                }
                else {
@@ -483,7 +501,8 @@ private void onBarrier(int channelIndex) throws IOException 
{
         * Makes sure the just written data is the next to be consumed.
         */
        private void releaseBlocksAndResetBarriers() throws IOException {
-               LOG.debug("End of stream alignment, feeding buffered data 
back");
+               LOG.debug("{}: End of stream alignment, feeding buffered data 
back.",
+                       inputGate.getOwningTaskName());
 
                for (int i = 0; i < blockedChannels.length; i++) {
                        blockedChannels[i] = false;
@@ -499,8 +518,9 @@ private void releaseBlocksAndResetBarriers() throws 
IOException {
                else {
                        // uncommon case: buffered data pending
                        // push back the pending data, if we have any
-                       LOG.debug("Checkpoint skipped via buffered data:" +
-                                       "Pushing back current alignment buffers 
and feeding back new alignment data first.");
+                       LOG.debug("{}: Checkpoint skipped via buffered data:" +
+                                       "Pushing back current alignment buffers 
and feeding back new alignment data first.",
+                               inputGate.getOwningTaskName());
 
                        // since we did not fully drain the previous sequence, 
we need to allocate a new buffer for this one
                        BufferOrEventSequence bufferedNow = 
bufferBlocker.rollOverWithoutReusingResources();
@@ -513,8 +533,9 @@ private void releaseBlocksAndResetBarriers() throws 
IOException {
                }
 
                if (LOG.isDebugEnabled()) {
-                       LOG.debug("Size of buffered data: {} bytes",
-                                       currentBuffered == null ? 0L : 
currentBuffered.size());
+                       LOG.debug("{}: Size of buffered data: {} bytes",
+                               inputGate.getOwningTaskName(),
+                               currentBuffered == null ? 0L : 
currentBuffered.size());
                }
 
                // the next barrier that comes must assume it is the first
@@ -555,7 +576,10 @@ public long getAlignmentDurationNanos() {
 
        @Override
        public String toString() {
-               return String.format("last checkpoint: %d, current barriers: 
%d, closed channels: %d",
-                               currentCheckpointId, numBarriersReceived, 
numClosedChannels);
+               return String.format("%s: last checkpoint: %d, current 
barriers: %d, closed channels: %d",
+                       inputGate.getOwningTaskName(),
+                       currentCheckpointId,
+                       numBarriersReceived,
+                       numClosedChannels);
        }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
index 6dd1e5ef2d6..e968101ca2d 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
@@ -139,11 +139,18 @@ public boolean isNextBarrier() {
                private int currentChannel = 0;
                private long c = 0;
 
+               private final String owningTaskName;
+
                public RandomGeneratingInputGate(BufferPool[] bufferPools, 
BarrierGenerator[] barrierGens) {
+                       this(bufferPools, barrierGens, "TestTask");
+               }
+
+               public RandomGeneratingInputGate(BufferPool[] bufferPools, 
BarrierGenerator[] barrierGens, String owningTaskName) {
                        this.numChannels = bufferPools.length;
                        this.currentBarriers = new int[numChannels];
                        this.bufferPools = bufferPools;
                        this.barrierGens = barrierGens;
+                       this.owningTaskName = owningTaskName;
                }
 
                @Override
@@ -151,6 +158,11 @@ public int getNumberOfInputChannels() {
                        return numChannels;
                }
 
+               @Override
+               public String getOwningTaskName() {
+                       return owningTaskName;
+               }
+
                @Override
                public boolean isFinished() {
                        return false;
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
index e62b709419f..6400a175e20 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
@@ -44,11 +44,18 @@
 
        private int closedChannels;
 
+       private final String owningTaskName;
+
        public MockInputGate(int pageSize, int numChannels, List<BufferOrEvent> 
bufferOrEvents) {
+               this(pageSize, numChannels, bufferOrEvents, "MockTask");
+       }
+
+       public MockInputGate(int pageSize, int numChannels, List<BufferOrEvent> 
bufferOrEvents, String owningTaskName) {
                this.pageSize = pageSize;
                this.numChannels = numChannels;
                this.bufferOrEvents = new 
ArrayDeque<BufferOrEvent>(bufferOrEvents);
                this.closed = new boolean[numChannels];
+               this.owningTaskName = owningTaskName;
        }
 
        @Override
@@ -61,6 +68,11 @@ public int getNumberOfInputChannels() {
                return numChannels;
        }
 
+       @Override
+       public String getOwningTaskName() {
+               return owningTaskName;
+       }
+
        @Override
        public boolean isFinished() {
                return bufferOrEvents.isEmpty();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to