Github user vanzin commented on a diff in the pull request:
https://github.com/apache/spark/pull/21346#discussion_r195819061
--- Diff:
common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java
---
@@ -130,6 +200,59 @@ public void onFailure(Throwable e) {
return res;
}
+ private RpcResult sendRpcWithStream(String... streams) throws Exception {
+ TransportClient client =
clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
+ final Semaphore sem = new Semaphore(0);
+ RpcResult res = new RpcResult();
+ res.successMessages = Collections.synchronizedSet(new
HashSet<String>());
+ res.errorMessages = Collections.synchronizedSet(new HashSet<String>());
+
+ for (String stream : streams) {
+ int idx = stream.lastIndexOf('/');
+ ManagedBuffer meta = new
NioManagedBuffer(JavaUtils.stringToBytes(stream));
+ String streamName = (idx == -1) ? stream : stream.substring(idx + 1);
+ ManagedBuffer data = testData.openStream(conf, streamName);
+ client.uploadStream(meta, data, new RpcStreamCallback(stream, res,
sem));
+ }
+
+ if (!sem.tryAcquire(streams.length, 5, TimeUnit.SECONDS)) {
+ fail("Timeout getting response from the server");
+ }
+ streamCallbacks.values().forEach(streamCallback -> {
+ try {
+
streamCallback.waitForCompletionAndVerify(TimeUnit.SECONDS.toMillis(5));
--- End diff --
Isn't the wait part now redundant, after you waited for the semaphore?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]