dcapwell commented on code in PR #2104:
URL: https://github.com/apache/cassandra/pull/2104#discussion_r1086003553
##########
src/java/org/apache/cassandra/config/DatabaseDescriptor.java:
##########
@@ -3416,6 +3416,11 @@ public static boolean streamEntireSSTables()
return conf.stream_entire_sstables;
}
+ public static long timeoutDelay()
Review Comment:
the pattern is to be `getTimeoutDelay`, should follow the pattern
##########
src/java/org/apache/cassandra/streaming/SessionInfo.java:
##########
@@ -205,4 +209,8 @@ public SessionSummary createSummary()
{
return new SessionSummary(FBUtilities.getBroadcastAddressAndPort(),
peer, receivingSummaries, sendingSummaries);
}
+
+ public String getFailurereason() {
Review Comment:
should be `getFailureReason()`
##########
src/java/org/apache/cassandra/streaming/StreamSession.java:
##########
@@ -981,7 +988,7 @@ public void streamSent(OutgoingStreamMessage message)
StreamTransferTask task = transfers.get(message.header.tableId);
if (task != null)
{
- task.scheduleTimeout(message.header.sequenceNumber, 12,
TimeUnit.HOURS);
+ task.scheduleTimeout(message.header.sequenceNumber,
DatabaseDescriptor.timeoutDelay(), TimeUnit.MILLISECONDS);
Review Comment:
this is brittle, rather than expecting the config to be MILLISECONDS, we
should have the DD return the Duration and convert to nanos; this allows users
to define any value they want.
##########
test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailureTest.java:
##########
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.test.streaming;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ForkJoinPool;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.streaming.CassandraIncomingFile;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.LogResult;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter;
+import org.apache.cassandra.io.sstable.format.big.BigTableZeroCopyWriter;
+import org.apache.cassandra.io.util.SequentialWriter;
+import org.apache.cassandra.utils.Shared;
+import org.assertj.core.api.Assertions;
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+
+public class StreamFailureTest extends TestBaseImpl
+{
+
+ private static final int FAILINGNODE = 2;
+
+ private static final Logger logger =
LoggerFactory.getLogger(StreamFailureTest.class);
+
+
+ @Test
+ public void failureInTheMiddleWithUnknown() throws IOException
+ {
+ streamTest(true, "java.lang.RuntimeException: TEST", FAILINGNODE);
+ }
+
+ @Test
+ public void failureInTheMiddleWithEOF() throws IOException
+ {
+ streamTest(false, "Session peer /127.0.0.1:7012 Failed because there
was an java.nio.channels.ClosedChannelException with state=STREAMING",
FAILINGNODE);
+ }
+
+ @Test
+ public void failureDueToSessionFailed() throws IOException
+ {
+ streamTest(true,"Remote peer /127.0.0.2:7012 failed stream session",
1);
+ }
+
+ @Test
+ public void failureDueToSessionTimeout() throws IOException
+ {
+ streamTest2("Failed because the session timed out");
+ }
+
+ private void streamTest(boolean zeroCopyStreaming, String reason, Integer
failedNode) throws IOException
+ {
+ try (Cluster cluster = Cluster.build(2)
+
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(3))
+
.withInstanceInitializer(BBHelper::install)
+ .withConfig(c -> c.with(Feature.values())
+
.set("stream_entire_sstables", zeroCopyStreaming)
+ // when die, this will
try to halt JVM, which is easier to validate in the test
+ // other levels
require checking state of the subsystems
+
.set("disk_failure_policy", "die"))
+ .start())
+ {
+ init(cluster);
+
+ cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int
PRIMARY KEY)"));
+
+ triggerStreaming(cluster, zeroCopyStreaming);
+ // make sure disk failure policy is not triggered
+
+ IInvokableInstance failingNode = cluster.get(failedNode);
+
+ searchForLog(failingNode, reason);
+ }
+ }
+
+ private void streamTest2(String reason) throws IOException
+ {
+ try (Cluster cluster = Cluster.build(2)
+
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(3))
Review Comment:
why 3 tokens for a 2 node cluster?
##########
src/java/org/apache/cassandra/streaming/StreamResultFuture.java:
##########
@@ -239,8 +239,14 @@ private synchronized void maybeComplete()
StreamState finalState = getCurrentState();
if (finalState.hasFailedSession())
{
- logger.warn("[Stream #{}] Stream failed", planId);
- tryFailure(new StreamException(finalState, "Stream failed"));
+ StringBuilder stringBuilder = new StringBuilder();
+ for (SessionInfo info : finalState.getAllSessionInfo())
+ {
+ if (info.isFailed())
+ stringBuilder.append("\nSession peer
").append(info.peer).append(info.failurereason);
+ }
+ logger.warn("[Stream #{}] Stream failed: {}", planId,
stringBuilder);
+ tryFailure(new StreamException(finalState, "Stream failed: " +
stringBuilder));
Review Comment:
> Could also convert to something like String message =
stringBuilder.toString();
Yes please!
##########
src/java/org/apache/cassandra/streaming/StreamResultFuture.java:
##########
@@ -239,8 +239,14 @@ private synchronized void maybeComplete()
StreamState finalState = getCurrentState();
if (finalState.hasFailedSession())
{
- logger.warn("[Stream #{}] Stream failed", planId);
- tryFailure(new StreamException(finalState, "Stream failed"));
+ StringBuilder stringBuilder = new StringBuilder();
+ for (SessionInfo info : finalState.getAllSessionInfo())
+ {
+ if (info.isFailed())
+ stringBuilder.append("\nSession peer
").append(info.peer).append(info.failurereason);
+ }
+ logger.warn("[Stream #{}] Stream failed: {}", planId,
stringBuilder);
+ tryFailure(new StreamException(finalState, "Stream failed: " +
stringBuilder));
Review Comment:
> but you dould include the common "String failed:" in the string builder.
I don't see it added to the string builder, but do think for getting rid of
copy/paste it should be added.
I would do the following at line 243
```
stringBuilder.append("Stream failed:");
```
then get rid of "Stream failed:" in logger and exception.
##########
test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailureTest.java:
##########
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.test.streaming;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ForkJoinPool;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.streaming.CassandraIncomingFile;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.LogResult;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter;
+import org.apache.cassandra.io.sstable.format.big.BigTableZeroCopyWriter;
+import org.apache.cassandra.io.util.SequentialWriter;
+import org.apache.cassandra.utils.Shared;
+import org.assertj.core.api.Assertions;
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+
+public class StreamFailureTest extends TestBaseImpl
+{
+
+ private static final int FAILINGNODE = 2;
+
+ private static final Logger logger =
LoggerFactory.getLogger(StreamFailureTest.class);
+
+
+ @Test
+ public void failureInTheMiddleWithUnknown() throws IOException
+ {
+ streamTest(true, "java.lang.RuntimeException: TEST", FAILINGNODE);
+ }
+
+ @Test
+ public void failureInTheMiddleWithEOF() throws IOException
+ {
+ streamTest(false, "Session peer /127.0.0.1:7012 Failed because there
was an java.nio.channels.ClosedChannelException with state=STREAMING",
FAILINGNODE);
+ }
+
+ @Test
+ public void failureDueToSessionFailed() throws IOException
+ {
+ streamTest(true,"Remote peer /127.0.0.2:7012 failed stream session",
1);
+ }
+
+ @Test
+ public void failureDueToSessionTimeout() throws IOException
+ {
+ streamTest2("Failed because the session timed out");
+ }
+
+ private void streamTest(boolean zeroCopyStreaming, String reason, Integer
failedNode) throws IOException
+ {
+ try (Cluster cluster = Cluster.build(2)
+
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(3))
+
.withInstanceInitializer(BBHelper::install)
+ .withConfig(c -> c.with(Feature.values())
+
.set("stream_entire_sstables", zeroCopyStreaming)
+ // when die, this will
try to halt JVM, which is easier to validate in the test
+ // other levels
require checking state of the subsystems
+
.set("disk_failure_policy", "die"))
+ .start())
+ {
+ init(cluster);
+
+ cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int
PRIMARY KEY)"));
+
+ triggerStreaming(cluster, zeroCopyStreaming);
+ // make sure disk failure policy is not triggered
+
+ IInvokableInstance failingNode = cluster.get(failedNode);
+
+ searchForLog(failingNode, reason);
+ }
+ }
+
+ private void streamTest2(String reason) throws IOException
+ {
+ try (Cluster cluster = Cluster.build(2)
+
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(3))
+ .withInstanceInitializer(BB::install)
+ .withConfig(c -> c.with(Feature.values())
+ // when die, this will
try to halt JVM, which is easier to validate in the test
+ // other levels
require checking state of the subsystems
+ .set("timeout_delay",
"1ms"))
+ .start())
+ {
+
+ init(cluster);
+ cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int
PRIMARY KEY)"));
+
+ ForkJoinPool.commonPool().execute(() -> triggerStreaming(cluster,
true));
+ State.STREAM_IS_RUNNING.await();
+ logger.info("Streaming is running... time to wake it up");
+ State.UNBLOCK_STREAM.signal();
+
+ IInvokableInstance failingNode = cluster.get(1);
+
+ searchForLog(failingNode, reason);
+
+
+ }
+ }
+
+
+ private void triggerStreaming(Cluster cluster, boolean
expectedEntireSSTable)
+ {
+ IInvokableInstance node1 = cluster.get(1);
+ IInvokableInstance node2 = cluster.get(2);
+
+ // repair will do streaming IFF there is a mismatch; so cause one
+ for (int i = 0; i < 10; i++)
+ node1.executeInternal(withKeyspace("INSERT INTO %s.tbl (pk) VALUES
(?)"), i); // timestamp won't match, causing a mismatch
+
+ // trigger streaming; expected to fail as streaming socket closed in
the middle (currently this is an unrecoverable event)
+ //Blocks until the stream is complete
+ node2.nodetoolResult("repair", "-full", KEYSPACE,
"tbl").asserts().failure();
+ }
+
+ private void searchForLog(IInvokableInstance failingNode, String reason)
+ {
+ LogResult<List<String>> result = failingNode.logs().grepForErrors(-1,
Pattern.compile("Stream failed:"));
+ // grepForErrors will include all ERROR logs even if they don't match
the pattern; for this reason need to filter after the fact
+ List<String> matches = result.getResult();
+
+ matches = matches.stream().filter(s -> s.startsWith("WARN") &&
s.contains("Stream failed")).collect(Collectors.toList());
Review Comment:
wouldn't `Stream failed` match everything? We explicitly searched for
that... so everything *must* contain that string.
##########
src/java/org/apache/cassandra/streaming/StreamSession.java:
##########
@@ -154,7 +155,7 @@
* (via {@link org.apache.cassandra.net.MessagingService}, while the actual
files themselves are sent by a special
* "streaming" connection type. See {@link StreamingMultiplexedChannel} for
details. Because of the asynchronous
*/
-public class StreamSession implements IEndpointStateChangeSubscriber
Review Comment:
can you cleanup imports? We shouldn't have imports that are not used above
##########
src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java:
##########
@@ -151,7 +151,7 @@ public StreamSummary apply(CompositeData input)
connecting,
fromArrayOfCompositeData((CompositeData[]) values[5], toStreamSummary),
fromArrayOfCompositeData((CompositeData[]) values[6], toStreamSummary),
-
StreamSession.State.valueOf((String) values[7]));
+
StreamSession.State.valueOf((String) values[7]), null);
Review Comment:
please document why you are using `null` here. Its fine to say its null to
maintain backwards compatibility, but its still best to say this so the next
reader doesn't question why its null
##########
src/java/org/apache/cassandra/streaming/StreamSession.java:
##########
@@ -682,8 +691,7 @@ public Future<?> onError(Throwable e)
planId(),
peer.getHostAddressAndPort(),
e);
-
- return closeSession(State.FAILED);
+ return closeSession(State.FAILED, " Failed because there was
an " + e.getClass().getCanonicalName() + " with state=" + state.name());
Review Comment:
can you remove the leading space?
##########
test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailureTest.java:
##########
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.test.streaming;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ForkJoinPool;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.streaming.CassandraIncomingFile;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.LogResult;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter;
+import org.apache.cassandra.io.sstable.format.big.BigTableZeroCopyWriter;
+import org.apache.cassandra.io.util.SequentialWriter;
+import org.apache.cassandra.utils.Shared;
+import org.assertj.core.api.Assertions;
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+
+public class StreamFailureTest extends TestBaseImpl
+{
+
+ private static final int FAILINGNODE = 2;
+
+ private static final Logger logger =
LoggerFactory.getLogger(StreamFailureTest.class);
+
+
+ @Test
+ public void failureInTheMiddleWithUnknown() throws IOException
+ {
+ streamTest(true, "java.lang.RuntimeException: TEST", FAILINGNODE);
+ }
+
+ @Test
+ public void failureInTheMiddleWithEOF() throws IOException
+ {
+ streamTest(false, "Session peer /127.0.0.1:7012 Failed because there
was an java.nio.channels.ClosedChannelException with state=STREAMING",
FAILINGNODE);
+ }
+
+ @Test
+ public void failureDueToSessionFailed() throws IOException
+ {
+ streamTest(true,"Remote peer /127.0.0.2:7012 failed stream session",
1);
+ }
+
+ @Test
+ public void failureDueToSessionTimeout() throws IOException
+ {
+ streamTest2("Failed because the session timed out");
+ }
+
+ private void streamTest(boolean zeroCopyStreaming, String reason, Integer
failedNode) throws IOException
+ {
+ try (Cluster cluster = Cluster.build(2)
+
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(3))
Review Comment:
why are you setting 3 tokens for a 2 node cluster?
##########
test/distributed/org/apache/cassandra/distributed/test/streaming/StreamFailureTest.java:
##########
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.test.streaming;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ForkJoinPool;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.streaming.CassandraIncomingFile;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.LogResult;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter;
+import org.apache.cassandra.io.sstable.format.big.BigTableZeroCopyWriter;
+import org.apache.cassandra.io.util.SequentialWriter;
+import org.apache.cassandra.utils.Shared;
+import org.assertj.core.api.Assertions;
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+
+public class StreamFailureTest extends TestBaseImpl
+{
+
+ private static final int FAILINGNODE = 2;
+
+ private static final Logger logger =
LoggerFactory.getLogger(StreamFailureTest.class);
+
+
+ @Test
+ public void failureInTheMiddleWithUnknown() throws IOException
+ {
+ streamTest(true, "java.lang.RuntimeException: TEST", FAILINGNODE);
+ }
+
+ @Test
+ public void failureInTheMiddleWithEOF() throws IOException
+ {
+ streamTest(false, "Session peer /127.0.0.1:7012 Failed because there
was an java.nio.channels.ClosedChannelException with state=STREAMING",
FAILINGNODE);
+ }
+
+ @Test
+ public void failureDueToSessionFailed() throws IOException
+ {
+ streamTest(true,"Remote peer /127.0.0.2:7012 failed stream session",
1);
+ }
+
+ @Test
+ public void failureDueToSessionTimeout() throws IOException
+ {
+ streamTest2("Failed because the session timed out");
+ }
+
+ private void streamTest(boolean zeroCopyStreaming, String reason, Integer
failedNode) throws IOException
+ {
+ try (Cluster cluster = Cluster.build(2)
+
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(3))
+
.withInstanceInitializer(BBHelper::install)
+ .withConfig(c -> c.with(Feature.values())
+
.set("stream_entire_sstables", zeroCopyStreaming)
+ // when die, this will
try to halt JVM, which is easier to validate in the test
+ // other levels
require checking state of the subsystems
+
.set("disk_failure_policy", "die"))
+ .start())
+ {
+ init(cluster);
+
+ cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int
PRIMARY KEY)"));
+
+ triggerStreaming(cluster, zeroCopyStreaming);
+ // make sure disk failure policy is not triggered
+
+ IInvokableInstance failingNode = cluster.get(failedNode);
+
+ searchForLog(failingNode, reason);
+ }
+ }
+
+ private void streamTest2(String reason) throws IOException
+ {
+ try (Cluster cluster = Cluster.build(2)
+
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(3))
+ .withInstanceInitializer(BB::install)
+ .withConfig(c -> c.with(Feature.values())
+ // when die, this will
try to halt JVM, which is easier to validate in the test
+ // other levels
require checking state of the subsystems
+ .set("timeout_delay",
"1ms"))
+ .start())
+ {
+
+ init(cluster);
+ cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int
PRIMARY KEY)"));
+
+ ForkJoinPool.commonPool().execute(() -> triggerStreaming(cluster,
true));
+ State.STREAM_IS_RUNNING.await();
+ logger.info("Streaming is running... time to wake it up");
+ State.UNBLOCK_STREAM.signal();
+
+ IInvokableInstance failingNode = cluster.get(1);
+
+ searchForLog(failingNode, reason);
+
+
+ }
+ }
+
+
+ private void triggerStreaming(Cluster cluster, boolean
expectedEntireSSTable)
+ {
+ IInvokableInstance node1 = cluster.get(1);
+ IInvokableInstance node2 = cluster.get(2);
+
+ // repair will do streaming IFF there is a mismatch; so cause one
+ for (int i = 0; i < 10; i++)
+ node1.executeInternal(withKeyspace("INSERT INTO %s.tbl (pk) VALUES
(?)"), i); // timestamp won't match, causing a mismatch
+
+ // trigger streaming; expected to fail as streaming socket closed in
the middle (currently this is an unrecoverable event)
+ //Blocks until the stream is complete
+ node2.nodetoolResult("repair", "-full", KEYSPACE,
"tbl").asserts().failure();
+ }
+
+ private void searchForLog(IInvokableInstance failingNode, String reason)
+ {
+ LogResult<List<String>> result = failingNode.logs().grepForErrors(-1,
Pattern.compile("Stream failed:"));
+ // grepForErrors will include all ERROR logs even if they don't match
the pattern; for this reason need to filter after the fact
+ List<String> matches = result.getResult();
+
+ matches = matches.stream().filter(s -> s.startsWith("WARN") &&
s.contains("Stream failed")).collect(Collectors.toList());
+ logger.info("Stream failed logs found: {}", String.join("\n",
matches));
+
+ Assertions.assertThat(matches)
+ .describedAs("node%d expected 1 element but was not true",
failingNode.config().num()).hasSize(1);
Review Comment:
nit: `.hasSize` should be next line
##########
src/java/org/apache/cassandra/streaming/StreamSession.java:
##########
@@ -514,13 +517,20 @@ synchronized void
addTransferStreams(Collection<OutgoingStream> streams)
}
}
- private synchronized Future<?> closeSession(State finalState)
+ private Future<?> closeSession(State finalState)
+ {
+ return closeSession(finalState, null);
+ }
+
+ private synchronized Future<?> closeSession(State finalState, String
reason)
Review Comment:
agree, also this is a single failure and not plural, so the variable
shouldn't be plural
##########
src/java/org/apache/cassandra/streaming/StreamSession.java:
##########
@@ -694,8 +702,7 @@ public Future<?> onError(Throwable e)
state(State.FAILED); // make sure subsequent error handling sees
the session in a final state
channel.sendControlMessage(new
SessionFailedMessage()).awaitUninterruptibly();
}
-
- return closeSession(State.FAILED);
+ return closeSession(State.FAILED, "Failed because of an unkown
exception;\n" + Throwables.getStackTraceAsString(e));
Review Comment:
> The details of the exception aren't provided, only the stack trace which
might lose info. We've already logged the main error details including the
stacktrace, so a summary with just the name/cause may be more useful when
debugging.
what details are you looking for? What info is lost?
We may have logged the error, but that can be hard in debugging as we need
to link 2 logs or a log with a vtable... IMO its best to include the exception
so while debugging everything is in one place
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]