Github user vanzin commented on a diff in the pull request:
https://github.com/apache/spark/pull/21346#discussion_r195591573
--- Diff:
common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java
---
@@ -207,9 +400,67 @@ private void assertErrorsContain(Set<String> errors,
Set<String> contains) {
break;
}
}
- assertTrue("Could not find error containing " + contain + "; errors:
" + errors, foundMatch);
+ if (!foundMatch) {
+ notFound.add(contain);
+ }
+ }
+ return new ImmutablePair<>(remainingErrors, notFound);
+ }
+
+ private static class VerifyingStreamCallback implements
StreamCallbackWithID {
+ final String streamId;
+ final StreamSuite.TestCallback helper;
+ final OutputStream out;
+ final File outFile;
+ VerifyingStreamCallback(String streamId) throws IOException {
+ if (streamId.equals("file")) {
+ outFile = File.createTempFile("data", ".tmp", testData.tempDir);
+ out = new FileOutputStream(outFile);
+ } else {
+ out = new ByteArrayOutputStream();
+ outFile = null;
+ }
+ this.streamId = streamId;
+ helper = new StreamSuite.TestCallback(out);
+ }
+
+ void waitForCompletionAndVerify(long timeoutMs) throws IOException {
+ helper.waitForCompletion(timeoutMs);
+ if (streamId.equals("file")) {
+ assertTrue("File stream did not match.",
Files.equal(testData.testFile, outFile));
+ } else {
+ byte[] result = ((ByteArrayOutputStream)out).toByteArray();
+ ByteBuffer srcBuffer = testData.srcBuffer(streamId);
+ ByteBuffer base;
+ synchronized (srcBuffer) {
+ base = srcBuffer.duplicate();
+ }
+ byte[] expected = new byte[base.remaining()];
+ base.get(expected);
+ assertEquals(expected.length, result.length);
+ assertTrue("buffers don't match", Arrays.equals(expected, result));
+
--- End diff --
nit: remove
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]