This is an automated email from the ASF dual-hosted git repository. toulmean pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git
The following commit(s) were added to refs/heads/master by this push: new 29f585a Plumtree and gossip app feedback 29f585a is described below commit 29f585a760e29895de7f02a71f7e8b006934b865 Author: Antoine Toulme <anto...@lunar-ocean.com> AuthorDate: Thu May 23 12:07:05 2019 -0700 Plumtree and gossip app feedback --- gossip/build.gradle | 2 ++ .../java/org/apache/tuweni/gossip/GossipApp.java | 15 +++++++++- .../tuweni/plumtree/vertx/VertxGossipServer.java | 34 +++++++++++++--------- .../plumtree/vertx/VertxGossipServerTest.java | 3 +- 4 files changed, 38 insertions(+), 16 deletions(-) diff --git a/gossip/build.gradle b/gossip/build.gradle index f708004..f0eba00 100644 --- a/gossip/build.gradle +++ b/gossip/build.gradle @@ -21,6 +21,8 @@ dependencies { compile 'info.picocli:picocli' compile 'io.vertx:vertx-core' compile 'org.bouncycastle:bcprov-jdk15on' + compile 'org.logl:logl-api' + compile 'org.logl:logl-logl' compile project(':bytes') compile project(':config') compile project(':plumtree') diff --git a/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java b/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java index ad7293c..173ec7b 100644 --- a/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java +++ b/gossip/src/main/java/org/apache/tuweni/gossip/GossipApp.java @@ -27,6 +27,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; +import java.security.Security; import java.time.Instant; import java.util.Collections; import java.util.concurrent.CompletionException; @@ -39,6 +40,7 @@ import io.vertx.core.Vertx; import io.vertx.core.http.HttpMethod; import io.vertx.core.http.HttpServer; import io.vertx.core.http.HttpServerRequest; +import org.bouncycastle.jce.provider.BouncyCastleProvider; import picocli.CommandLine; /** @@ -48,6 +50,7 @@ import picocli.CommandLine; public final class GossipApp { public static void main(String[] args) { + Security.addProvider(new BouncyCastleProvider()); GossipCommandLineOptions opts = CommandLine.populateCommand(new GossipCommandLineOptions(), args); try { opts.validate(); @@ -61,10 +64,12 @@ public final class GossipApp { System.exit(0); } GossipApp gossipApp = new GossipApp(Vertx.vertx(), opts, System.err, System.out, () -> System.exit(1)); - Runtime.getRuntime().addShutdownHook(new Thread(() -> gossipApp.stop())); + Runtime.getRuntime().addShutdownHook(new Thread(gossipApp::stop)); gossipApp.start(); } + + private final GossipCommandLineOptions opts; private final Runnable terminateFunction; private final PrintStream errStream; @@ -79,6 +84,7 @@ public final class GossipApp { PrintStream outStream, Runnable terminateFunction) { EphemeralPeerRepository repository = new EphemeralPeerRepository(); + outStream.println("Setting up server on " + opts.networkInterface() + ":" + opts.listenPort()); server = new VertxGossipServer( vertx, opts.networkInterface(), @@ -95,6 +101,7 @@ public final class GossipApp { } void start() { + outStream.println("Starting gossip"); AsyncCompletion completion = server.start(); try { completion.join(); @@ -102,6 +109,7 @@ public final class GossipApp { errStream.println("Server could not start: " + e.getMessage()); terminateFunction.run(); } + outStream.println("TCP server started"); CompletableAsyncCompletion rpcCompletion = AsyncCompletion.incomplete(); rpcServer.requestHandler(this::handleRPCRequest).listen(opts.rpcPort(), opts.networkInterface(), res -> { @@ -117,6 +125,7 @@ public final class GossipApp { errStream.println("RPC server could not start: " + e.getMessage()); terminateFunction.run(); } + outStream.println("RPC server started"); try { AsyncCompletion @@ -125,6 +134,7 @@ public final class GossipApp { } catch (TimeoutException | InterruptedException e) { errStream.println("Server could not connect to other peers: " + e.getMessage()); } + outStream.println("Gossip started"); } private void handleRPCRequest(HttpServerRequest httpServerRequest) { @@ -143,6 +153,7 @@ public final class GossipApp { } void stop() { + outStream.println("Stopping gossip"); try { server.stop().join(); } catch (InterruptedException e) { @@ -161,6 +172,7 @@ public final class GossipApp { try { rpcCompletion.join(); } catch (CompletionException | InterruptedException e) { + outStream.println("Stopped gossip"); errStream.println("RPC server could not stop: " + e.getMessage()); terminateFunction.run(); } @@ -184,6 +196,7 @@ public final class GossipApp { } public void publish(Bytes message) { + outStream.println("Message to publish " + message.toHexString()); server.gossip("", message); } } diff --git a/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java b/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java index 9cb2d38..dce838d 100644 --- a/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java +++ b/plumtree/src/main/java/org/apache/tuweni/plumtree/vertx/VertxGossipServer.java @@ -187,26 +187,32 @@ public final class VertxGossipServer { if (!started.get()) { throw new IllegalStateException("Server has not started"); } + CompletableAsyncCompletion completion = AsyncCompletion.incomplete(); AtomicInteger counter = new AtomicInteger(0); - while (!completion.isDone()) { - client.connect(port, host, res -> { - if (res.failed()) { - if (counter.incrementAndGet() > 5) { - completion.completeExceptionally(res.cause()); - } - } else { - completion.complete(); - Peer peer = new SocketPeer(res.result()); - SocketHandler handler = new SocketHandler(peer); - res.result().handler(handler::handle).closeHandler(handler::close); - } - }); - } + + roundConnect(host, port, counter, completion); return completion; } + private void roundConnect(String host, int port, AtomicInteger counter, CompletableAsyncCompletion completion) { + client.connect(port, host, res -> { + if (res.failed()) { + if (counter.incrementAndGet() > 5) { + completion.completeExceptionally(res.cause()); + } else { + roundConnect(host, port, counter, completion); + } + } else { + Peer peer = new SocketPeer(res.result()); + SocketHandler handler = new SocketHandler(peer); + res.result().handler(handler::handle).closeHandler(handler::close); + completion.complete(); + } + }); + } + /** * Gossip a message to all known peers. * diff --git a/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java b/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java index 19e2fa9..d5dd5d8 100644 --- a/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java +++ b/plumtree/src/test/java/org/apache/tuweni/plumtree/vertx/VertxGossipServerTest.java @@ -166,13 +166,14 @@ class VertxGossipServerTest { server1.connectTo("127.0.0.1", 10001).join(); server2.connectTo("127.0.0.1", 10002).join(); server1.connectTo("127.0.0.1", 10002).join(); + assertEquals(2, peerRepository1.eagerPushPeers().size()); String attributes = "{\"message_type\": \"BLOCK\"}"; server1.gossip(attributes, Bytes.fromHexString("deadbeef")); Thread.sleep(1000); assertEquals(Bytes.fromHexString("deadbeef"), messageReceived2.get()); Thread.sleep(1000); - assertTrue(peerRepository1.lazyPushPeers().size() > 1 || peerRepository3.lazyPushPeers().size() > 1); + assertTrue(peerRepository1.lazyPushPeers().size() > 0 || peerRepository3.lazyPushPeers().size() > 0); server1.stop().join(); server2.stop().join(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@tuweni.apache.org For additional commands, e-mail: commits-h...@tuweni.apache.org