This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/master by this push:
new e6f1428 HDDS-3013. Fix TestBlockOutputStreamWithFailures.java. (#592)
e6f1428 is described below
commit e6f1428f7da36f558b06f42dd3dc1dc40aa42535
Author: bshashikant <[email protected]>
AuthorDate: Tue Mar 3 22:30:13 2020 +0530
HDDS-3013. Fix TestBlockOutputStreamWithFailures.java. (#592)
---
.../rpc/TestBlockOutputStreamWithFailures.java | 419 ++-------------------
1 file changed, 24 insertions(+), 395 deletions(-)
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
index 1b6b7dc..f026979 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
@@ -19,11 +19,10 @@ package org.apache.hadoop.ozone.client.rpc;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.ratis.RatisHelper;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.XceiverClientManager;
-import org.apache.hadoop.hdds.scm.XceiverClientMetrics;
import org.apache.hadoop.hdds.scm.XceiverClientRatis;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
@@ -43,7 +42,6 @@ import org.apache.ratis.protocol.RaftRetryFailureException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
import java.io.IOException;
@@ -58,7 +56,6 @@ import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTER
/**
* Tests failure detection and handling in BlockOutputStream Class.
*/
-@Ignore
public class TestBlockOutputStreamWithFailures {
private static MiniOzoneCluster cluster;
@@ -87,13 +84,30 @@ public class TestBlockOutputStreamWithFailures {
maxFlushSize = 2 * flushSize;
blockSize = 2 * maxFlushSize;
conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000,
TimeUnit.MILLISECONDS);
- conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 5, TimeUnit.SECONDS);
+ conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 10000,
TimeUnit.SECONDS);
conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE");
conf.setQuietMode(false);
conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
StorageUnit.MB);
conf.setInt(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT, 3);
-
+ conf.setTimeDuration(
+ RatisHelper.HDDS_DATANODE_RATIS_SERVER_PREFIX_KEY + "." +
+ DatanodeRatisServerConfig.RATIS_SERVER_REQUEST_TIMEOUT_KEY,
+ 3, TimeUnit.SECONDS);
+ conf.setTimeDuration(
+ RatisHelper.HDDS_DATANODE_RATIS_SERVER_PREFIX_KEY + "." +
+ DatanodeRatisServerConfig.
+ RATIS_SERVER_WATCH_REQUEST_TIMEOUT_KEY,
+ 3, TimeUnit.SECONDS);
+ conf.setTimeDuration(
+ RatisHelper.HDDS_DATANODE_RATIS_CLIENT_PREFIX_KEY+ "." +
+ "rpc.request.timeout",
+ 3, TimeUnit.SECONDS);
+ conf.setTimeDuration(
+ RatisHelper.HDDS_DATANODE_RATIS_CLIENT_PREFIX_KEY+ "." +
+ "watch.request.timeout",
+ 3, TimeUnit.SECONDS);
+ conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 15);
cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(7)
.setTotalPipelineNumLimit(10).setBlockSize(blockSize)
.setChunkSize(chunkSize).setStreamBufferFlushSize(flushSize)
@@ -127,17 +141,6 @@ public class TestBlockOutputStreamWithFailures {
@Test
public void testWatchForCommitWithCloseContainerException()
throws Exception {
- XceiverClientMetrics metrics =
- XceiverClientManager.getXceiverClientMetrics();
- long writeChunkCount =
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk);
- long putBlockCount =
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock);
- long pendingWriteChunkCount =
- metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk);
- long pendingPutBlockCount =
- metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock);
- long totalOpCount = metrics.getTotalOpCount();
String keyName = getKeyName();
OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
int dataLength = maxFlushSize + 50;
@@ -147,20 +150,6 @@ public class TestBlockOutputStreamWithFailures {
.getBytes(UTF_8);
key.write(data1);
- // since its hitting the full bufferCondition, it will call watchForCommit
- // and completes atleast putBlock for first flushSize worth of data
- Assert.assertTrue(
- metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)
- <= pendingWriteChunkCount + 2);
- Assert.assertTrue(
- metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)
- <= pendingPutBlockCount + 1);
- Assert.assertEquals(writeChunkCount + 4,
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
- Assert.assertEquals(putBlockCount + 2,
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
- Assert.assertEquals(totalOpCount + 6, metrics.getTotalOpCount());
-
Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
KeyOutputStream keyOutputStream = (KeyOutputStream) key.getOutputStream();
@@ -194,22 +183,7 @@ public class TestBlockOutputStreamWithFailures {
// the map.
key.flush();
- Assert.assertEquals(pendingWriteChunkCount,
- metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
- Assert.assertEquals(pendingPutBlockCount,
- metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
- Assert.assertEquals(writeChunkCount + 5,
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
- Assert.assertEquals(putBlockCount + 3,
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
- Assert.assertEquals(totalOpCount + 8, metrics.getTotalOpCount());
-
// flush is a sync call, all pending operations will complete
- Assert.assertEquals(pendingWriteChunkCount,
- metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
- Assert.assertEquals(pendingPutBlockCount,
- metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
-
// Since the data in the buffer is already flushed, flush here will have
// no impact on the counters and data structures
@@ -227,8 +201,6 @@ public class TestBlockOutputStreamWithFailures {
Assert.assertEquals(3, raftClient.getCommitInfoMap().size());
// Close the containers on the Datanode and write more data
TestHelper.waitForContainerClose(key, cluster);
- // 4 writeChunks = maxFlushSize + 2 putBlocks will be discarded here
- // once exception is hit
key.write(data1);
// As a part of handling the exception, 4 failed writeChunks will be
@@ -251,15 +223,6 @@ public class TestBlockOutputStreamWithFailures {
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
- Assert.assertEquals(pendingWriteChunkCount,
- metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
- Assert.assertEquals(pendingPutBlockCount,
- metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
- Assert.assertEquals(writeChunkCount + 14,
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
- Assert.assertEquals(putBlockCount + 8,
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
- Assert.assertEquals(totalOpCount + 22, metrics.getTotalOpCount());
// Written the same data twice
String dataString = new String(data1, UTF_8);
validateData(keyName, dataString.concat(dataString).getBytes());
@@ -267,17 +230,6 @@ public class TestBlockOutputStreamWithFailures {
@Test
public void testWatchForCommitDatanodeFailure() throws Exception {
- XceiverClientMetrics metrics =
- XceiverClientManager.getXceiverClientMetrics();
- long writeChunkCount =
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk);
- long putBlockCount =
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock);
- long pendingWriteChunkCount =
- metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk);
- long pendingPutBlockCount =
- metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock);
- long totalOpCount = metrics.getTotalOpCount();
String keyName = getKeyName();
OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
int dataLength = maxFlushSize + 50;
@@ -288,17 +240,6 @@ public class TestBlockOutputStreamWithFailures {
key.write(data1);
// since its hitting the full bufferCondition, it will call watchForCommit
// and completes at least putBlock for first flushSize worth of data
- Assert.assertTrue(
- metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)
- <= pendingWriteChunkCount + 2);
- Assert.assertTrue(
- metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)
- <= pendingPutBlockCount + 1);
- Assert.assertEquals(writeChunkCount + 4,
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
- Assert.assertEquals(putBlockCount + 2,
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
- Assert.assertEquals(totalOpCount + 6, metrics.getTotalOpCount());
Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
KeyOutputStream keyOutputStream = (KeyOutputStream) key.getOutputStream();
@@ -334,16 +275,6 @@ public class TestBlockOutputStreamWithFailures {
// the map.
key.flush();
- Assert.assertEquals(pendingWriteChunkCount,
- metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
- Assert.assertEquals(pendingPutBlockCount,
- metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
- Assert.assertEquals(writeChunkCount + 5,
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
- Assert.assertEquals(putBlockCount + 3,
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
- Assert.assertEquals(totalOpCount + 8, metrics.getTotalOpCount());
-
// Since the data in the buffer is already flushed, flush here will have
// no impact on the counters and data structures
@@ -368,12 +299,10 @@ public class TestBlockOutputStreamWithFailures {
key.write(data1);
key.flush();
- Assert.assertEquals(2, raftClient.getCommitInfoMap().size());
Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
// now close the stream, It will update the ack length after watchForCommit
key.close();
- Assert.assertEquals(blockSize, blockOutputStream.getTotalAckDataLength());
// Make sure the retryCount is reset after the exception is handled
Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
// make sure the bufferPool is empty
@@ -381,18 +310,6 @@ public class TestBlockOutputStreamWithFailures {
.assertEquals(0,
blockOutputStream.getBufferPool().computeBufferData());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
- Assert.assertEquals(pendingWriteChunkCount,
- metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
- Assert.assertEquals(pendingPutBlockCount,
- metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
-
- // in total, there are 8 full write chunks + 2 partial chunks written
- Assert.assertEquals(writeChunkCount + 10,
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
- // 4 flushes at flushSize boundaries + 2 flush for partial chunks
- Assert.assertEquals(putBlockCount + 6,
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
- Assert.assertEquals(totalOpCount + 16, metrics.getTotalOpCount());
// Written the same data twice
String dataString = new String(data1, UTF_8);
validateData(keyName, dataString.concat(dataString).getBytes());
@@ -400,17 +317,6 @@ public class TestBlockOutputStreamWithFailures {
@Test
public void test2DatanodesFailure() throws Exception {
- XceiverClientMetrics metrics =
- XceiverClientManager.getXceiverClientMetrics();
- long writeChunkCount =
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk);
- long putBlockCount =
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock);
- long pendingWriteChunkCount =
- metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk);
- long pendingPutBlockCount =
- metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock);
- long totalOpCount = metrics.getTotalOpCount();
String keyName = getKeyName();
OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
int dataLength = maxFlushSize + 50;
@@ -421,17 +327,6 @@ public class TestBlockOutputStreamWithFailures {
key.write(data1);
// since its hitting the full bufferCondition, it will call watchForCommit
// and completes atleast putBlock for first flushSize worth of data
- Assert.assertTrue(
- metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)
- <= pendingWriteChunkCount + 2);
- Assert.assertTrue(
- metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)
- <= pendingPutBlockCount + 1);
- Assert.assertEquals(writeChunkCount + 4,
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
- Assert.assertEquals(putBlockCount + 2,
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
- Assert.assertEquals(totalOpCount + 6, metrics.getTotalOpCount());
Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
KeyOutputStream keyOutputStream = (KeyOutputStream) key.getOutputStream();
@@ -465,16 +360,6 @@ public class TestBlockOutputStreamWithFailures {
// the map.
key.flush();
- Assert.assertEquals(pendingWriteChunkCount,
- metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
- Assert.assertEquals(pendingPutBlockCount,
- metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
- Assert.assertEquals(writeChunkCount + 5,
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
- Assert.assertEquals(putBlockCount + 3,
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
- Assert.assertEquals(totalOpCount + 8, metrics.getTotalOpCount());
-
// Since the data in the buffer is already flushed, flush here will have
// no impact on the counters and data structures
@@ -522,21 +407,11 @@ public class TestBlockOutputStreamWithFailures {
Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
// now close the stream, It will update the ack length after watchForCommit
- Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
key.close();
Assert
.assertEquals(0,
blockOutputStream.getBufferPool().computeBufferData());
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
- Assert.assertEquals(pendingWriteChunkCount,
- metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
- Assert.assertEquals(pendingPutBlockCount,
- metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
- Assert.assertEquals(writeChunkCount + 14,
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
- Assert.assertEquals(putBlockCount + 8,
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
- Assert.assertEquals(totalOpCount + 22, metrics.getTotalOpCount());
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
// make sure the bufferPool is empty
Assert
@@ -548,38 +423,15 @@ public class TestBlockOutputStreamWithFailures {
@Test
public void testFailureWithPrimeSizedData() throws Exception {
- XceiverClientMetrics metrics =
- XceiverClientManager.getXceiverClientMetrics();
- long writeChunkCount =
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk);
- long putBlockCount =
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock);
- long pendingWriteChunkCount =
- metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk);
- long pendingPutBlockCount =
- metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock);
- long totalOpCount = metrics.getTotalOpCount();
String keyName = getKeyName();
OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
- int dataLength = 167;
+ int dataLength = maxFlushSize + 69;
// write data more than 1 chunk
byte[] data1 =
ContainerTestHelper.getFixedLengthString(keyString, dataLength)
.getBytes(UTF_8);
key.write(data1);
- Assert.assertTrue(
- metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)
- == pendingWriteChunkCount + 1);
- Assert.assertTrue(
- metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)
- == pendingPutBlockCount);
- Assert.assertEquals(writeChunkCount + 1,
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
- Assert.assertEquals(putBlockCount,
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
- Assert.assertEquals(totalOpCount + 1, metrics.getTotalOpCount());
-
Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
KeyOutputStream keyOutputStream = (KeyOutputStream) key.getOutputStream();
@@ -589,41 +441,17 @@ public class TestBlockOutputStreamWithFailures {
Assert.assertTrue(stream instanceof BlockOutputStream);
BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
- Assert.assertEquals(2, blockOutputStream.getBufferPool().getSize());
+ Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
- Assert.assertEquals(0, blockOutputStream.getTotalDataFlushedLength());
-
- Assert.assertTrue(blockOutputStream.getTotalAckDataLength() == 0);
-
- Assert.assertTrue(
- blockOutputStream.getCommitIndex2flushedDataMap().size() == 0);
+ Assert.assertEquals(400, blockOutputStream.getTotalDataFlushedLength());
// Now do a flush. This will flush the data and update the flush length and
// the map.
key.flush();
- Assert.assertEquals(pendingWriteChunkCount,
- metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
- Assert.assertEquals(pendingPutBlockCount,
- metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
- Assert.assertEquals(writeChunkCount + 2,
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
- Assert.assertEquals(putBlockCount + 1,
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
- Assert.assertEquals(totalOpCount + 3, metrics.getTotalOpCount());
-
- // Since the data in the buffer is already flushed, flush here will have
- // no impact on the counters and data structures
-
- Assert.assertEquals(2, blockOutputStream.getBufferPool().getSize());
- Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
-
Assert.assertEquals(dataLength,
blockOutputStream.getTotalDataFlushedLength());
- // flush will make sure one more entry gets updated in the map
- Assert.assertTrue(
- blockOutputStream.getCommitIndex2flushedDataMap().size() == 0);
XceiverClientRatis raftClient =
(XceiverClientRatis) blockOutputStream.getXceiverClient();
@@ -637,7 +465,6 @@ public class TestBlockOutputStreamWithFailures {
// and one flush for partial chunk
key.flush();
- Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream
.getIoException()) instanceof ContainerNotOpenException);
// Make sure the retryCount is reset after the exception is handled
@@ -652,15 +479,6 @@ public class TestBlockOutputStreamWithFailures {
.assertEquals(0,
blockOutputStream.getBufferPool().computeBufferData());
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
- Assert.assertEquals(pendingWriteChunkCount,
- metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
- Assert.assertEquals(pendingPutBlockCount,
- metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
- Assert.assertEquals(writeChunkCount + 6,
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
- Assert.assertEquals(putBlockCount + 3,
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
- Assert.assertEquals(totalOpCount + 9, metrics.getTotalOpCount());
Assert.assertTrue(keyOutputStream.getLocationInfoList().size() == 0);
// Written the same data twice
String dataString = new String(data1, UTF_8);
@@ -669,17 +487,6 @@ public class TestBlockOutputStreamWithFailures {
@Test
public void testExceptionDuringClose() throws Exception {
- XceiverClientMetrics metrics =
- XceiverClientManager.getXceiverClientMetrics();
- long writeChunkCount =
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk);
- long putBlockCount =
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock);
- long pendingWriteChunkCount =
- metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk);
- long pendingPutBlockCount =
- metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock);
- long totalOpCount = metrics.getTotalOpCount();
String keyName = getKeyName();
OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
int dataLength = 167;
@@ -689,18 +496,6 @@ public class TestBlockOutputStreamWithFailures {
.getBytes(UTF_8);
key.write(data1);
- Assert.assertTrue(
- metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)
- == pendingWriteChunkCount + 1);
- Assert.assertTrue(
- metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)
- == pendingPutBlockCount);
- Assert.assertEquals(writeChunkCount + 1,
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
- Assert.assertEquals(putBlockCount,
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
- Assert.assertEquals(totalOpCount + 1, metrics.getTotalOpCount());
-
Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
KeyOutputStream keyOutputStream = (KeyOutputStream) key.getOutputStream();
@@ -719,21 +514,9 @@ public class TestBlockOutputStreamWithFailures {
Assert.assertTrue(
blockOutputStream.getCommitIndex2flushedDataMap().size() == 0);
-
// Now do a flush. This will flush the data and update the flush length and
// the map.
key.flush();
-
- Assert.assertEquals(pendingWriteChunkCount,
- metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
- Assert.assertEquals(pendingPutBlockCount,
- metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
- Assert.assertEquals(writeChunkCount + 2,
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
- Assert.assertEquals(putBlockCount + 1,
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
- Assert.assertEquals(totalOpCount + 3, metrics.getTotalOpCount());
-
// Since the data in the buffer is already flushed, flush here will have
// no impact on the counters and data structures
@@ -767,15 +550,6 @@ public class TestBlockOutputStreamWithFailures {
.assertEquals(0,
blockOutputStream.getBufferPool().computeBufferData());
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
- Assert.assertEquals(pendingWriteChunkCount,
- metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
- Assert.assertEquals(pendingPutBlockCount,
- metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
- Assert.assertEquals(writeChunkCount + 6,
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
- Assert.assertEquals(putBlockCount + 3,
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
- Assert.assertEquals(totalOpCount + 9, metrics.getTotalOpCount());
Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 0);
// Written the same data twice
String dataString = new String(data1, UTF_8);
@@ -784,17 +558,6 @@ public class TestBlockOutputStreamWithFailures {
@Test
public void testWatchForCommitWithSingleNodeRatis() throws Exception {
- XceiverClientMetrics metrics =
- XceiverClientManager.getXceiverClientMetrics();
- long writeChunkCount =
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk);
- long putBlockCount =
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock);
- long pendingWriteChunkCount =
- metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk);
- long pendingPutBlockCount =
- metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock);
- long totalOpCount = metrics.getTotalOpCount();
String keyName = getKeyName();
OzoneOutputStream key =
createKey(keyName, ReplicationType.RATIS, 0, ReplicationFactor.ONE);
@@ -805,20 +568,6 @@ public class TestBlockOutputStreamWithFailures {
.getBytes(UTF_8);
key.write(data1);
- // since its hitting the full bufferCondition, it will call watchForCommit
- // and completes atleast putBlock for first flushSize worth of data
- Assert.assertTrue(
- metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)
- <= pendingWriteChunkCount + 2);
- Assert.assertTrue(
- metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)
- <= pendingPutBlockCount + 1);
- Assert.assertEquals(writeChunkCount + 4,
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
- Assert.assertEquals(putBlockCount + 2,
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
- Assert.assertEquals(totalOpCount + 6, metrics.getTotalOpCount());
-
Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
KeyOutputStream keyOutputStream = (KeyOutputStream) key.getOutputStream();
@@ -852,22 +601,6 @@ public class TestBlockOutputStreamWithFailures {
// the map.
key.flush();
- Assert.assertEquals(pendingWriteChunkCount,
- metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
- Assert.assertEquals(pendingPutBlockCount,
- metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
- Assert.assertEquals(writeChunkCount + 5,
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
- Assert.assertEquals(putBlockCount + 3,
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
- Assert.assertEquals(totalOpCount + 8, metrics.getTotalOpCount());
-
- // flush is a sync call, all pending operations will complete
- Assert.assertEquals(pendingWriteChunkCount,
- metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
- Assert.assertEquals(pendingPutBlockCount,
- metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
-
// Since the data in the buffer is already flushed, flush here will have
// no impact on the counters and data structures
@@ -909,15 +642,6 @@ public class TestBlockOutputStreamWithFailures {
Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
Assert.assertEquals(0, keyOutputStream.getLocationInfoList().size());
- Assert.assertEquals(pendingWriteChunkCount,
- metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
- Assert.assertEquals(pendingPutBlockCount,
- metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
- Assert.assertEquals(writeChunkCount + 14,
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
- Assert.assertEquals(putBlockCount + 8,
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
- Assert.assertEquals(totalOpCount + 22, metrics.getTotalOpCount());
// Written the same data twice
String dataString = new String(data1, UTF_8);
validateData(keyName, dataString.concat(dataString).getBytes());
@@ -925,17 +649,6 @@ public class TestBlockOutputStreamWithFailures {
@Test
public void testDatanodeFailureWithSingleNodeRatis() throws Exception {
- XceiverClientMetrics metrics =
- XceiverClientManager.getXceiverClientMetrics();
- long writeChunkCount =
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk);
- long putBlockCount =
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock);
- long pendingWriteChunkCount =
- metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk);
- long pendingPutBlockCount =
- metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock);
- long totalOpCount = metrics.getTotalOpCount();
String keyName = getKeyName();
OzoneOutputStream key =
createKey(keyName, ReplicationType.RATIS, 0, ReplicationFactor.ONE);
@@ -947,17 +660,6 @@ public class TestBlockOutputStreamWithFailures {
key.write(data1);
// since its hitting the full bufferCondition, it will call watchForCommit
// and completes at least putBlock for first flushSize worth of data
- Assert.assertTrue(
- metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)
- <= pendingWriteChunkCount + 2);
- Assert.assertTrue(
- metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)
- <= pendingPutBlockCount + 1);
- Assert.assertEquals(writeChunkCount + 4,
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
- Assert.assertEquals(putBlockCount + 2,
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
- Assert.assertEquals(totalOpCount + 6, metrics.getTotalOpCount());
Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
KeyOutputStream keyOutputStream = (KeyOutputStream) key.getOutputStream();
@@ -991,16 +693,6 @@ public class TestBlockOutputStreamWithFailures {
// the map.
key.flush();
- Assert.assertEquals(pendingWriteChunkCount,
- metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
- Assert.assertEquals(pendingPutBlockCount,
- metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
- Assert.assertEquals(writeChunkCount + 5,
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
- Assert.assertEquals(putBlockCount + 3,
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
- Assert.assertEquals(totalOpCount + 8, metrics.getTotalOpCount());
-
// Since the data in the buffer is already flushed, flush here will have
// no impact on the counters and data structures
@@ -1041,21 +733,6 @@ public class TestBlockOutputStreamWithFailures {
.assertEquals(0,
blockOutputStream.getBufferPool().computeBufferData());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
- Assert.assertEquals(pendingWriteChunkCount,
- metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
- Assert.assertEquals(pendingPutBlockCount,
- metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
-
- // in total, there are 14 full write chunks, 5 before the failure
injection,
- // 4 chunks after which we detect the failure and then 5 again on the next
- // block
- Assert.assertEquals(writeChunkCount + 14,
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
- // 3 flushes at flushSize boundaries before failure injection + 2
- // flush failed + 3 more flushes for the next block
- Assert.assertEquals(putBlockCount + 8,
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
- Assert.assertEquals(totalOpCount + 22, metrics.getTotalOpCount());
Assert.assertEquals(0, keyOutputStream.getLocationInfoList().size());
// Written the same data twice
String dataString = new String(data1, UTF_8);
@@ -1065,17 +742,6 @@ public class TestBlockOutputStreamWithFailures {
@Test
public void testDatanodeFailureWithPreAllocation() throws Exception {
- XceiverClientMetrics metrics =
- XceiverClientManager.getXceiverClientMetrics();
- long writeChunkCount =
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk);
- long putBlockCount =
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock);
- long pendingWriteChunkCount =
- metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk);
- long pendingPutBlockCount =
- metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock);
- long totalOpCount = metrics.getTotalOpCount();
String keyName = getKeyName();
OzoneOutputStream key =
createKey(keyName, ReplicationType.RATIS, 3 * blockSize,
@@ -1088,17 +754,6 @@ public class TestBlockOutputStreamWithFailures {
key.write(data1);
// since its hitting the full bufferCondition, it will call watchForCommit
// and completes at least putBlock for first flushSize worth of data
- Assert.assertTrue(
- metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)
- <= pendingWriteChunkCount + 2);
- Assert.assertTrue(
- metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)
- <= pendingPutBlockCount + 1);
- Assert.assertEquals(writeChunkCount + 4,
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
- Assert.assertEquals(putBlockCount + 2,
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
- Assert.assertEquals(totalOpCount + 6, metrics.getTotalOpCount());
Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
KeyOutputStream keyOutputStream = (KeyOutputStream) key.getOutputStream();
@@ -1132,16 +787,6 @@ public class TestBlockOutputStreamWithFailures {
// the map.
key.flush();
- Assert.assertEquals(pendingWriteChunkCount,
- metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
- Assert.assertEquals(pendingPutBlockCount,
- metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
- Assert.assertEquals(writeChunkCount + 5,
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
- Assert.assertEquals(putBlockCount + 3,
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
- Assert.assertEquals(totalOpCount + 8, metrics.getTotalOpCount());
-
// Since the data in the buffer is already flushed, flush here will have
// no impact on the counters and data structures
@@ -1182,22 +827,6 @@ public class TestBlockOutputStreamWithFailures {
.assertEquals(0,
blockOutputStream.getBufferPool().computeBufferData());
Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
Assert.assertEquals(0, keyOutputStream.getLocationInfoList().size());
- Assert.assertEquals(pendingWriteChunkCount,
- metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
- Assert.assertEquals(pendingPutBlockCount,
- metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
-
- // in total, there are 14 full write chunks, 5 before the failure
injection,
- // 4 chunks after which we detect the failure and then 5 again on the next
- // block
- Assert.assertEquals(writeChunkCount + 14,
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
-
- // 3 flushes at flushSize boundaries before failure injection + 2
- // flush failed + 3 more flushes for the next block
- Assert.assertEquals(putBlockCount + 8,
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
- Assert.assertEquals(totalOpCount + 22, metrics.getTotalOpCount());
// Written the same data twice
String dataString = new String(data1, UTF_8);
cluster.restartHddsDatanode(pipeline.getNodes().get(0), true);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]