This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 0cee30e HDDS-2762. Sonar warnings fixed (#373)
0cee30e is described below
commit 0cee30e71e374e8e383641c25907fe7a407ae7f0
Author: Siddharth <[email protected]>
AuthorDate: Thu Dec 19 04:50:54 2019 -0800
HDDS-2762. Sonar warnings fixed (#373)
---
.../ozone/client/io/BlockOutputStreamEntry.java | 9 +-
.../client/io/BlockOutputStreamEntryPool.java | 24 ++--
.../hadoop/ozone/client/io/KeyOutputStream.java | 152 +++++++++++----------
3 files changed, 94 insertions(+), 91 deletions(-)
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
index 8381be0..1aa10d8 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import java.util.Collection;
+import java.util.Collections;
/**
* Helper class used inside {@link BlockOutputStream}.
@@ -59,7 +60,7 @@ public final class BlockOutputStreamEntry extends
OutputStream {
private final long watchTimeout;
private BufferPool bufferPool;
- @SuppressWarnings("parameternumber")
+ @SuppressWarnings({"parameternumber", "squid:S00107"})
private BlockOutputStreamEntry(BlockID blockID, String key,
XceiverClientManager xceiverClientManager,
Pipeline pipeline, String requestId, int chunkSize,
@@ -167,15 +168,15 @@ public final class BlockOutputStreamEntry extends
OutputStream {
}
}
- Collection<DatanodeDetails> getFailedServers() throws IOException {
+ Collection<DatanodeDetails> getFailedServers() {
if (outputStream != null) {
BlockOutputStream out = (BlockOutputStream) this.outputStream;
return out.getFailedServers();
}
- return null;
+ return Collections.emptyList();
}
- long getWrittenDataLength() throws IOException {
+ long getWrittenDataLength() {
if (outputStream != null) {
BlockOutputStream out = (BlockOutputStream) this.outputStream;
return out.getWrittenDataLength();
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
index 198feb1..abfbcc5 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
@@ -70,7 +70,7 @@ public class BlockOutputStreamEntryPool {
private final long openID;
private final ExcludeList excludeList;
- @SuppressWarnings("parameternumber")
+ @SuppressWarnings({"parameternumber", "squid:S00107"})
public BlockOutputStreamEntryPool(OzoneManagerProtocol omClient,
int chunkSize, String requestId, HddsProtos.ReplicationFactor factor,
HddsProtos.ReplicationType type, long bufferFlushSize, long
bufferMaxSize,
@@ -227,9 +227,9 @@ public class BlockOutputStreamEntryPool {
while (streamEntryIterator.hasNext()) {
BlockOutputStreamEntry streamEntry = streamEntryIterator.next();
Preconditions.checkArgument(streamEntry.getCurrentPosition() == 0);
- if ((pipelineId != null && streamEntry.getPipeline().getId()
- .equals(pipelineId)) || (containerID != -1
- && streamEntry.getBlockID().getContainerID() == containerID)) {
+ if ((streamEntry.getPipeline().getId().equals(pipelineId)) ||
+ (containerID != -1 &&
+ streamEntry.getBlockID().getContainerID() == containerID)) {
streamEntryIterator.remove();
}
}
@@ -249,7 +249,8 @@ public class BlockOutputStreamEntryPool {
}
long getKeyLength() {
- return streamEntries.stream().mapToLong(e -> e.getCurrentPosition()).sum();
+ return streamEntries.stream().mapToLong(
+ BlockOutputStreamEntry::getCurrentPosition).sum();
}
/**
* Contact OM to get a new block. Set the new block with the index (e.g.
@@ -310,21 +311,12 @@ public class BlockOutputStreamEntryPool {
Preconditions.checkNotNull(omClient);
// allocate a new block, if a exception happens, log an error and
// throw exception to the caller directly, and the write fails.
- int succeededAllocates = 0;
- try {
- allocateNewBlock();
- succeededAllocates += 1;
- } catch (IOException ioe) {
- LOG.error("Try to allocate more blocks for write failed, already "
- + "allocated {} blocks for this write.", succeededAllocates, ioe);
- throw ioe;
- }
+ allocateNewBlock();
}
// in theory, this condition should never violate due the check above
// still do a sanity check.
Preconditions.checkArgument(currentStreamIndex < streamEntries.size());
- BlockOutputStreamEntry current = streamEntries.get(currentStreamIndex);
- return current;
+ return streamEntries.get(currentStreamIndex);
}
long computeBufferData() {
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index ad0deec..28916f9 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -113,7 +113,7 @@ public class KeyOutputStream extends OutputStream {
return retryCount;
}
- @SuppressWarnings("parameternumber")
+ @SuppressWarnings({"parameternumber", "squid:S00107"})
public KeyOutputStream(OpenKeySession handler,
XceiverClientManager xceiverClientManager,
OzoneManagerProtocol omClient, int chunkSize,
@@ -201,32 +201,7 @@ public class KeyOutputStream extends OutputStream {
// comes via Exception path.
int writeLen = Math.min((int) len, (int) current.getRemaining());
long currentPos = current.getWrittenDataLength();
- try {
- if (retry) {
- current.writeOnRetry(len);
- } else {
- current.write(b, off, writeLen);
- offset += writeLen;
- }
- } catch (IOException ioe) {
- // for the current iteration, totalDataWritten - currentPos gives the
- // amount of data already written to the buffer
-
- // In the retryPath, the total data to be written will always be
equal
- // to or less than the max length of the buffer allocated.
- // The len specified here is the combined sum of the data length of
- // the buffers
- Preconditions.checkState(!retry || len <= blockOutputStreamEntryPool
- .getStreamBufferMaxSize());
- int dataWritten = (int) (current.getWrittenDataLength() -
currentPos);
- writeLen = retry ? (int) len : dataWritten;
- // In retry path, the data written is already accounted in offset.
- if (!retry) {
- offset += writeLen;
- }
- LOG.debug("writeLen {}, total len {}", writeLen, len);
- handleException(current, ioe);
- }
+ writeToOutputStream(current, retry, len, b, writeLen, off, currentPos);
if (current.getRemaining() <= 0) {
// since the current block is already written close the stream.
handleFlushOrClose(StreamAction.FULL);
@@ -235,11 +210,42 @@ public class KeyOutputStream extends OutputStream {
off += writeLen;
} catch (Exception e) {
markStreamClosed();
- throw e;
+ throw new IOException("Allocate any more blocks for write failed", e);
}
}
}
+ private void writeToOutputStream(BlockOutputStreamEntry current,
+ boolean retry, long len, byte[] b, int writeLen, int off, long
currentPos)
+ throws IOException {
+ try {
+ if (retry) {
+ current.writeOnRetry(len);
+ } else {
+ current.write(b, off, writeLen);
+ offset += writeLen;
+ }
+ } catch (IOException ioe) {
+ // for the current iteration, totalDataWritten - currentPos gives the
+ // amount of data already written to the buffer
+
+ // In the retryPath, the total data to be written will always be equal
+ // to or less than the max length of the buffer allocated.
+ // The len specified here is the combined sum of the data length of
+ // the buffers
+ Preconditions.checkState(!retry || len <= blockOutputStreamEntryPool
+ .getStreamBufferMaxSize());
+ int dataWritten = (int) (current.getWrittenDataLength() - currentPos);
+ writeLen = retry ? (int) len : dataWritten;
+ // In retry path, the data written is already accounted in offset.
+ if (!retry) {
+ offset += writeLen;
+ }
+ LOG.debug("writeLen {}, total len {}", writeLen, len);
+ handleException(current, ioe);
+ }
+ }
+
/**
* It performs following actions :
* a. Updates the committed length at datanode for the current stream in
@@ -342,7 +348,7 @@ public class KeyOutputStream extends OutputStream {
try {
action = retryPolicy.shouldRetry(exception, retryCount, 0, true);
} catch (Exception e) {
- throw e instanceof IOException ? (IOException) e : new IOException(e);
+ throw new IOException(e);
}
if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) {
String msg = "";
@@ -371,8 +377,10 @@ public class KeyOutputStream extends OutputStream {
}
}
retryCount++;
- LOG.trace("Retrying Write request. Already tried " + retryCount
- + " time(s); retry policy is " + retryPolicy);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Retrying Write request. Already tried {} time(s); " +
+ "retry policy is {} ", retryCount, retryPolicy);
+ }
handleWrite(null, 0, len, true);
}
@@ -413,52 +421,56 @@ public class KeyOutputStream extends OutputStream {
* outputStream.
* @throws IOException In case, flush or close fails with exception.
*/
+ @SuppressWarnings("squid:S1141")
private void handleFlushOrClose(StreamAction op) throws IOException {
- if (blockOutputStreamEntryPool.isEmpty()) {
- return;
- }
- while (true) {
- try {
- BlockOutputStreamEntry entry =
- blockOutputStreamEntryPool.getCurrentStreamEntry();
- if (entry != null) {
- try {
- Collection<DatanodeDetails> failedServers =
- entry.getFailedServers();
- // failed servers can be null in case there is no data written in
- // the stream
- if (failedServers != null && !failedServers.isEmpty()) {
- blockOutputStreamEntryPool.getExcludeList()
- .addDatanodes(failedServers);
- }
- switch (op) {
- case CLOSE:
- entry.close();
- break;
- case FULL:
- if (entry.getRemaining() == 0) {
- entry.close();
- }
- break;
- case FLUSH:
- entry.flush();
- break;
- default:
- throw new IOException("Invalid Operation");
+ if (!blockOutputStreamEntryPool.isEmpty()) {
+ while (true) {
+ try {
+ BlockOutputStreamEntry entry =
+ blockOutputStreamEntryPool.getCurrentStreamEntry();
+ if (entry != null) {
+ try {
+ handleStreamAction(entry, op);
+ } catch (IOException ioe) {
+ handleException(entry, ioe);
+ continue;
}
- } catch (IOException ioe) {
- handleException(entry, ioe);
- continue;
}
+ return;
+ } catch (Exception e) {
+ markStreamClosed();
+ throw e;
}
- break;
- } catch (Exception e) {
- markStreamClosed();
- throw e;
}
}
}
+ private void handleStreamAction(BlockOutputStreamEntry entry,
+ StreamAction op) throws IOException {
+ Collection<DatanodeDetails> failedServers = entry.getFailedServers();
+ // failed servers can be null in case there is no data written in
+ // the stream
+ if (!failedServers.isEmpty()) {
+ blockOutputStreamEntryPool.getExcludeList().addDatanodes(
+ failedServers);
+ }
+ switch (op) {
+ case CLOSE:
+ entry.close();
+ break;
+ case FULL:
+ if (entry.getRemaining() == 0) {
+ entry.close();
+ }
+ break;
+ case FLUSH:
+ entry.flush();
+ break;
+ default:
+ throw new IOException("Invalid Operation");
+ }
+ }
+
/**
* Commit the key to OM, this will add the blocks as the new key blocks.
*
@@ -473,8 +485,6 @@ public class KeyOutputStream extends OutputStream {
try {
handleFlushOrClose(StreamAction.CLOSE);
blockOutputStreamEntryPool.commitKey(offset);
- } catch (IOException ioe) {
- throw ioe;
} finally {
blockOutputStreamEntryPool.cleanup();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]