This is an automated email from the ASF dual-hosted git repository.

toulmean pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git


The following commit(s) were added to refs/heads/master by this push:
     new 7191c74  Hobbits implementation for TCP/HTTP/UDP/WS. Still WIP
7191c74 is described below

commit 7191c7498d3da3c15e21b57866dc7f8cff0595d4
Author: Antoine Toulme <anto...@lunar-ocean.com>
AuthorDate: Fri Jun 7 13:31:14 2019 -0700

    Hobbits implementation for TCP/HTTP/UDP/WS. Still WIP
---
 .../org/apache/tuweni/hobbits/HobbitsTransport.kt  | 178 ++++++++++++---
 .../kotlin/org/apache/tuweni/hobbits/Message.kt    |  10 +
 .../apache/tuweni/hobbits/HobbitsTransportTest.kt  |   2 +-
 .../org/apache/tuweni/hobbits/InteractionTest.kt   | 246 +++++++++++++++++++++
 4 files changed, 409 insertions(+), 27 deletions(-)

diff --git 
a/hobbits/src/main/kotlin/org/apache/tuweni/hobbits/HobbitsTransport.kt 
b/hobbits/src/main/kotlin/org/apache/tuweni/hobbits/HobbitsTransport.kt
index 4c1d16c..8f3ddf0 100644
--- a/hobbits/src/main/kotlin/org/apache/tuweni/hobbits/HobbitsTransport.kt
+++ b/hobbits/src/main/kotlin/org/apache/tuweni/hobbits/HobbitsTransport.kt
@@ -26,6 +26,7 @@ import io.vertx.core.net.NetClient
 import io.vertx.core.net.NetServer
 import kotlinx.coroutines.CoroutineScope
 import kotlinx.coroutines.Dispatchers
+import org.apache.tuweni.bytes.Bytes
 import org.apache.tuweni.concurrent.AsyncCompletion
 import org.apache.tuweni.concurrent.coroutines.await
 import java.lang.IllegalArgumentException
@@ -38,7 +39,7 @@ import kotlin.coroutines.CoroutineContext
 /**
  * Hobbits is a peer-to-peer transport stack specified at 
https://www.github.com/deltap2p/hobbits.
  *
- * This class works as a transport mechanism that can leverage a variety of 
network transport
+ * This class works as a transport mech anism that can leverage a variety of 
network transport
  * mechanisms, such as TCP, HTTP, UDP and Web sockets.
  *
  * It can be used to contact other Hobbits endpoints, or to expose endpoints 
to the network.
@@ -56,27 +57,42 @@ class HobbitsTransport(
   private val udpEndpoints = mutableMapOf<String, Endpoint>()
   private val wsEndpoints = mutableMapOf<String, Endpoint>()
 
+  private var exceptionHandler: ((Throwable) -> Unit)? = { }
+
   private var httpClient: HttpClient? = null
   private var tcpClient: NetClient? = null
   private var udpClient: DatagramSocket? = null
 
-  private var httpServer: HttpServer? = null
-  private var tcpServer: NetServer? = null
+  private val httpServers = mutableMapOf<String, HttpServer>()
+  private val tcpServers = mutableMapOf<String, NetServer>()
+  private val udpServers = mutableMapOf<String, DatagramSocket>()
+  private val wsServers = mutableMapOf<String, HttpServer>()
+
+  /**
+   * Sets an exception handler that will be called whenever an exception 
occurs during transport.
+   */
+  fun exceptionHandler(handler: (Throwable) -> Unit) {
+    exceptionHandler = handler
+  }
 
   /**
    * Creates a new endpoint over http.
    * @param networkInterface the network interface to bind the endpoint to
    * @param port the port to serve traffic from
+   * @param requestURI the request URI path to match
    * @param tls whether the endpoint should be secured using TLS
+   * @param handler function called when a message is received
    */
   fun createHTTPEndpoint(
     id: String = "default",
     networkInterface: String = "0.0.0.0",
     port: Int = 9337,
-    tls: Boolean = false
+    requestURI: String? = null,
+    tls: Boolean = false,
+    handler: (Message) -> Unit
   ) {
     checkNotStarted()
-    httpEndpoints[id] = Endpoint(networkInterface, port, tls)
+    httpEndpoints[id] = Endpoint(networkInterface, port, requestURI, tls, 
handler)
   }
 
   /**
@@ -84,15 +100,17 @@ class HobbitsTransport(
    * @param networkInterface the network interface to bind the endpoint to
    * @param port the port to serve traffic from
    * @param tls whether the endpoint should be secured using TLS
+   * @param handler function called when a message is received
    */
   fun createTCPEndpoint(
     id: String = "default",
     networkInterface: String = "0.0.0.0",
     port: Int = 9237,
-    tls: Boolean = false
+    tls: Boolean = false,
+    handler: (Message) -> Unit
   ) {
     checkNotStarted()
-    tcpEndpoints[id] = Endpoint(networkInterface, port, tls)
+    tcpEndpoints[id] = Endpoint(networkInterface, port, null, tls, handler)
   }
 
   /**
@@ -100,43 +118,54 @@ class HobbitsTransport(
    * @param networkInterface the network interface to bind the endpoint to
    * @param port the port to serve traffic from
    * @param tls whether the endpoint should be secured using TLS
+   * @param handler function called when a message is received
    */
-  fun createUDPEndpoint(id: String = "default", networkInterface: String = 
"0.0.0.0", port: Int = 9137) {
+  fun createUDPEndpoint(
+    id: String = "default",
+    networkInterface: String = "0.0.0.0",
+    port: Int = 9137,
+    handler: (Message) -> Unit
+  ) {
     checkNotStarted()
-    udpEndpoints[id] = Endpoint(networkInterface, port, false)
+    udpEndpoints[id] = Endpoint(networkInterface, port, null, false, handler)
   }
 
   /**
    * Creates a new endpoint over websocket connections.
    * @param networkInterface the network interface to bind the endpoint to
    * @param port the port to serve traffic from
+   * @param requestURI the request URI path to match
    * @param tls whether the endpoint should be secured using TLS
+   * @param handler function called when a message is received
    */
   fun createWSEndpoint(
     id: String = "default",
     networkInterface: String = "0.0.0.0",
     port: Int = 9037,
-    tls: Boolean = false
+    requestURI: String? = null,
+    tls: Boolean = false,
+    handler: (Message) -> Unit
   ) {
     checkNotStarted()
-    wsEndpoints[id] = Endpoint(networkInterface, port, tls)
+    wsEndpoints[id] = Endpoint(networkInterface, port, requestURI, tls, 
handler)
   }
 
   /**
    * Sends a message using the transport specified.
    *
    */
-  suspend fun sendMessage(message: Message, transport: Transport, host: 
String, port: Int) {
+  suspend fun sendMessage(message: Message, transport: Transport, host: 
String, port: Int, requestURI: String = "") {
     checkStarted()
     val completion = AsyncCompletion.incomplete()
     when (transport) {
       Transport.HTTP -> {
         @Suppress("DEPRECATION")
-        val req = httpClient!!.request(HttpMethod.POST, port, host, 
"/").handler {
+        val req = httpClient!!.request(HttpMethod.POST, port, host, requestURI)
+          .exceptionHandler(exceptionHandler).handler {
           if (it.statusCode() == 200) {
             completion.complete()
           } else {
-            completion.completeExceptionally(RuntimeException())
+            
completion.completeExceptionally(RuntimeException("${it.statusCode()}"))
           }
         }
         req.end(Buffer.buffer(message.toBytes().toArrayUnsafe()))
@@ -146,16 +175,29 @@ class HobbitsTransport(
           if (res.failed()) {
             completion.completeExceptionally(res.cause())
           } else {
-            res.result().end(Buffer.buffer(message.toBytes().toArrayUnsafe()))
+            
res.result().exceptionHandler(exceptionHandler).end(Buffer.buffer(message.toBytes().toArrayUnsafe()))
             completion.complete()
           }
         }
       }
       Transport.UDP -> {
-        TODO()
+        udpClient!!.send(Buffer.buffer(message.toBytes().toArrayUnsafe()), 
port, host) { handler ->
+          if (handler.failed()) {
+                completion.completeExceptionally(handler.cause())
+          } else {
+                completion.complete()
+            }
+        }
       }
       Transport.WS -> {
-        TODO()
+        httpClient!!.websocket(port, host, requestURI, { handler ->
+          handler.exceptionHandler(exceptionHandler)
+            
.writeBinaryMessage(Buffer.buffer(message.toBytes().toArrayUnsafe())).end()
+          completion.complete()
+        },
+        { exception ->
+          completion.completeExceptionally(exception)
+        })
       }
     }
     completion.await()
@@ -201,15 +243,45 @@ class HobbitsTransport(
     if (started.compareAndSet(false, true)) {
       httpClient = vertx.createHttpClient()
       tcpClient = vertx.createNetClient()
-      udpClient = vertx.createDatagramSocket()
-
-      httpServer = vertx.createHttpServer()
-      tcpServer = vertx.createNetServer()
+      udpClient = 
vertx.createDatagramSocket().exceptionHandler(exceptionHandler)
 
       val completions = mutableListOf<AsyncCompletion>()
-      for (endpoint in httpEndpoints.values) {
+      for ((id, endpoint) in httpEndpoints) {
+        val completion = AsyncCompletion.incomplete()
+        val httpServer = vertx.createHttpServer()
+        httpServers[id] = httpServer
+
+        httpServer.requestHandler {
+          if (endpoint.requestURI == null || 
it.path().startsWith(endpoint.requestURI)) {
+            it.bodyHandler { 
endpoint.handler(Message.readMessage(Bytes.wrapBuffer(it))!!) }
+            it.response().statusCode = 200
+            it.response().end()
+          } else {
+            it.response().statusCode = 404
+            it.response().end()
+          }
+        }.listen(endpoint.port, endpoint.networkInterface) {
+          if (it.failed()) {
+            completion.completeExceptionally(it.cause())
+          } else {
+            completion.complete()
+          }
+        }
+        completions.add(completion)
+      }
+      for ((id, endpoint) in tcpEndpoints) {
         val completion = AsyncCompletion.incomplete()
-        httpServer!!.listen(endpoint.port, endpoint.networkInterface) {
+        val tcpServer = vertx.createNetServer()
+        tcpServers[id] = tcpServer
+        tcpServer.connectHandler { connectHandler -> connectHandler.handler { 
buffer ->
+          val message = Message.readMessage(Bytes.wrapBuffer(buffer))
+          if (message == null) {
+            TODO("Buffer not implemented yet")
+          } else {
+            endpoint.handler(message)
+          }
+        }
+        }.listen(endpoint.port, endpoint.networkInterface) {
           if (it.failed()) {
             completion.completeExceptionally(it.cause())
           } else {
@@ -218,9 +290,48 @@ class HobbitsTransport(
         }
         completions.add(completion)
       }
-      for (endpoint in tcpEndpoints.values) {
+      for ((id, endpoint) in udpEndpoints) {
         val completion = AsyncCompletion.incomplete()
-        tcpServer!!.listen(endpoint.port, endpoint.networkInterface) {
+
+        val udpServer = vertx.createDatagramSocket()
+        udpServers[id] = udpServer
+
+        udpServer.handler { packet ->
+          val message = Message.readMessage(Bytes.wrapBuffer(packet.data()))
+          if (message == null) {
+            TODO("Buffer not implemented yet")
+          } else {
+            endpoint.handler(message)
+          }
+        }.listen(endpoint.port, endpoint.networkInterface) {
+          if (it.failed()) {
+            completion.completeExceptionally(it.cause())
+          } else {
+            completion.complete()
+          }
+        }
+        completions.add(completion)
+      }
+      for ((id, endpoint) in wsEndpoints) {
+        val completion = AsyncCompletion.incomplete()
+        val httpServer = vertx.createHttpServer()
+        wsServers[id] = httpServer
+
+        httpServer.websocketHandler {
+          if (endpoint.requestURI == null || 
it.path().startsWith(endpoint.requestURI)) {
+            it.accept()
+
+            it.binaryMessageHandler { buffer ->
+              try {
+                
endpoint.handler(Message.readMessage(Bytes.wrapBuffer(buffer))!!)
+              } finally {
+                it.end()
+              }
+            }
+          } else {
+            it.reject()
+          }
+        }.listen(endpoint.port, endpoint.networkInterface) {
           if (it.failed()) {
             completion.completeExceptionally(it.cause())
           } else {
@@ -238,6 +349,15 @@ class HobbitsTransport(
       httpClient!!.close()
       tcpClient!!.close()
       udpClient!!.close()
+      for (server in httpServers.values) {
+        server.close()
+      }
+      for (server in tcpServers.values) {
+        server.close()
+      }
+      for (server in udpServers.values) {
+        server.close()
+      }
     }
   }
 
@@ -254,7 +374,13 @@ class HobbitsTransport(
   }
 }
 
-internal data class Endpoint(val networkInterface: String, val port: Int, val 
tls: Boolean)
+internal data class Endpoint(
+  val networkInterface: String,
+  val port: Int,
+  val requestURI: String?,
+  val tls: Boolean,
+  val handler: (Message) -> Unit
+)
 
 enum class Transport() {
   HTTP,
diff --git a/hobbits/src/main/kotlin/org/apache/tuweni/hobbits/Message.kt 
b/hobbits/src/main/kotlin/org/apache/tuweni/hobbits/Message.kt
index 14c2cc3..259da84 100644
--- a/hobbits/src/main/kotlin/org/apache/tuweni/hobbits/Message.kt
+++ b/hobbits/src/main/kotlin/org/apache/tuweni/hobbits/Message.kt
@@ -85,9 +85,19 @@ class Message(
 
   /**
    * Writes a message into bytes.
+   * @return the bytes of the message
    */
   fun toBytes(): Bytes {
     val requestLine = "$protocol $version $command ${headers.size()} 
${body.size()}\n"
     return Bytes.concatenate(Bytes.wrap(requestLine.toByteArray()), headers, 
body)
   }
+
+  /**
+   * Provides the size of the message
+   * @return the size of the message
+   */
+  fun size(): Int {
+    return protocol.length + 5 + version.length + command.length + 
headers.size().toString().length +
+      body.size().toString().length + headers.size() + body.size()
+  }
 }
diff --git 
a/hobbits/src/test/kotlin/org/apache/tuweni/hobbits/HobbitsTransportTest.kt 
b/hobbits/src/test/kotlin/org/apache/tuweni/hobbits/HobbitsTransportTest.kt
index 8ddac68..2984dd0 100644
--- a/hobbits/src/test/kotlin/org/apache/tuweni/hobbits/HobbitsTransportTest.kt
+++ b/hobbits/src/test/kotlin/org/apache/tuweni/hobbits/HobbitsTransportTest.kt
@@ -57,7 +57,7 @@ class HobbitsTransportTest {
     val server = HobbitsTransport(vertx)
     server.start()
     val exception: IllegalStateException = assertThrows {
-      server.createHTTPEndpoint()
+      server.createHTTPEndpoint(handler = {})
     }
     assertEquals("Server already started", exception.message)
   }
diff --git 
a/hobbits/src/test/kotlin/org/apache/tuweni/hobbits/InteractionTest.kt 
b/hobbits/src/test/kotlin/org/apache/tuweni/hobbits/InteractionTest.kt
new file mode 100644
index 0000000..49eeb09
--- /dev/null
+++ b/hobbits/src/test/kotlin/org/apache/tuweni/hobbits/InteractionTest.kt
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuweni.hobbits
+
+import io.vertx.core.Vertx
+import kotlinx.coroutines.runBlocking
+import org.apache.tuweni.bytes.Bytes
+import org.apache.tuweni.junit.VertxExtension
+import org.apache.tuweni.junit.VertxInstance
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.extension.ExtendWith
+import java.util.concurrent.atomic.AtomicReference
+
+@ExtendWith(VertxExtension::class)
+class TCPPersistentTest {
+
+  @Test
+  fun testTwoTCPConnections(@VertxInstance vertx: Vertx) {
+    val ref = AtomicReference<Message>()
+    val client1 = HobbitsTransport(vertx)
+    val client2 = HobbitsTransport(vertx)
+    runBlocking {
+      client1.createTCPEndpoint("foo", port = 10000, handler = ref::set)
+      client1.start()
+      client2.start()
+      client2.sendMessage(
+        Message(command = "WHO", body = Bytes.fromHexString("deadbeef"), 
headers = Bytes.random(16)),
+        Transport.TCP,
+        "0.0.0.0",
+        10000
+      )
+    }
+    Thread.sleep(200)
+    assertEquals(Bytes.fromHexString("deadbeef"), ref.get().body)
+    client1.stop()
+    client2.stop()
+  }
+
+  @Test
+  fun testTwoEndpoints(@VertxInstance vertx: Vertx) {
+    val ref = AtomicReference<Message>()
+    val ref2 = AtomicReference<Message>()
+    val client1 = HobbitsTransport(vertx)
+    val client2 = HobbitsTransport(vertx)
+    runBlocking {
+      client1.createTCPEndpoint("foo", port = 10000, handler = ref::set)
+      client1.createTCPEndpoint("bar", port = 10001, handler = ref2::set)
+      client1.start()
+      client2.start()
+      client2.sendMessage(
+        Message(command = "WHO", body = Bytes.fromHexString("deadbeef"), 
headers = Bytes.random(16)),
+        Transport.TCP,
+        "0.0.0.0",
+        10000
+      )
+      client2.sendMessage(
+        Message(command = "WHO", body = Bytes.fromHexString("deadbeef"), 
headers = Bytes.random(16)),
+        Transport.TCP,
+        "0.0.0.0",
+        10001
+      )
+    }
+    Thread.sleep(200)
+    assertEquals(Bytes.fromHexString("deadbeef"), ref.get().body)
+    assertEquals(Bytes.fromHexString("deadbeef"), ref2.get().body)
+    client1.stop()
+    client2.stop()
+  }
+}
+
+@ExtendWith(VertxExtension::class)
+class HTTPTest {
+  @Test
+  fun testTwoHTTPConnections(@VertxInstance vertx: Vertx) {
+    val ref = AtomicReference<Message>()
+    val client1 = HobbitsTransport(vertx)
+    val client2 = HobbitsTransport(vertx)
+
+    runBlocking {
+      client1.createHTTPEndpoint("foo", port = 10000, handler = ref::set)
+      client1.start()
+      client2.start()
+      client2.sendMessage(Message(command = "WHO", body = 
Bytes.fromHexString("deadbeef"),
+        headers = Bytes.random(16)), Transport.HTTP, "0.0.0.0", 10000)
+    }
+    Thread.sleep(200)
+    assertEquals(Bytes.fromHexString("deadbeef"), ref.get().body)
+    client1.stop()
+    client2.stop()
+  }
+
+  @Test
+  fun testTwoEndpoints(@VertxInstance vertx: Vertx) {
+    val ref = AtomicReference<Message>()
+    val ref2 = AtomicReference<Message>()
+    val client1 = HobbitsTransport(vertx)
+    val client2 = HobbitsTransport(vertx)
+    runBlocking {
+      client1.createHTTPEndpoint("foo", port = 10000, handler = ref::set)
+      client1.createHTTPEndpoint("bar", port = 10001, handler = ref2::set)
+      client1.start()
+      client2.start()
+      client2.sendMessage(
+        Message(command = "WHO", body = Bytes.fromHexString("deadbeef"), 
headers = Bytes.random(16)),
+        Transport.HTTP,
+        "0.0.0.0",
+        10000
+      )
+      client2.sendMessage(
+        Message(command = "WHO", body = Bytes.fromHexString("deadbeef"), 
headers = Bytes.random(16)),
+        Transport.HTTP,
+        "0.0.0.0",
+        10001
+      )
+    }
+    Thread.sleep(200)
+    assertEquals(Bytes.fromHexString("deadbeef"), ref.get().body)
+    assertEquals(Bytes.fromHexString("deadbeef"), ref2.get().body)
+    client1.stop()
+    client2.stop()
+  }
+}
+
+@ExtendWith(VertxExtension::class)
+class UDPTest {
+  @Test
+  fun testTwoUDPConnections(@VertxInstance vertx: Vertx) {
+    val ref = AtomicReference<Message>()
+    val client1 = HobbitsTransport(vertx)
+    val client2 = HobbitsTransport(vertx)
+
+    runBlocking {
+      client1.createUDPEndpoint("foo", port = 10000, handler = ref::set)
+      client1.start()
+      client2.start()
+      client2.sendMessage(Message(command = "WHO", body = 
Bytes.fromHexString("deadbeef"),
+        headers = Bytes.random(16)), Transport.UDP, "0.0.0.0", 10000)
+    }
+    Thread.sleep(200)
+    assertEquals(Bytes.fromHexString("deadbeef"), ref.get().body)
+    client1.stop()
+    client2.stop()
+  }
+
+  @Test
+  fun testTwoEndpoints(@VertxInstance vertx: Vertx) {
+    val ref = AtomicReference<Message>()
+    val ref2 = AtomicReference<Message>()
+    val client1 = HobbitsTransport(vertx)
+    val client2 = HobbitsTransport(vertx)
+    runBlocking {
+      client1.createUDPEndpoint("foo", port = 10000, handler = ref::set)
+      client1.createUDPEndpoint("bar", port = 10001, handler = ref2::set)
+      client1.start()
+      client2.start()
+      client2.sendMessage(
+        Message(command = "WHO", body = Bytes.fromHexString("deadbeef"), 
headers = Bytes.random(16)),
+        Transport.UDP,
+        "0.0.0.0",
+        10000
+      )
+      client2.sendMessage(
+        Message(command = "WHO", body = Bytes.fromHexString("deadbeef"), 
headers = Bytes.random(16)),
+        Transport.UDP,
+        "0.0.0.0",
+        10001
+      )
+    }
+    Thread.sleep(200)
+    assertEquals(Bytes.fromHexString("deadbeef"), ref.get().body)
+    assertEquals(Bytes.fromHexString("deadbeef"), ref2.get().body)
+    client1.stop()
+    client2.stop()
+  }
+}
+
+@ExtendWith(VertxExtension::class)
+class WebSocketTest {
+  @Test
+  fun testTwoWSConnections(@VertxInstance vertx: Vertx) {
+    vertx.exceptionHandler { it.printStackTrace() }
+    val ref = AtomicReference<Message>()
+    val client1 = HobbitsTransport(vertx)
+    val client2 = HobbitsTransport(vertx)
+
+    runBlocking {
+      client1.createWSEndpoint("foo", port = 10000, handler = ref::set)
+      client1.start()
+      client2.start()
+      client2.sendMessage(Message(command = "WHO", body = 
Bytes.fromHexString("deadbeef"),
+        headers = Bytes.random(16)), Transport.WS, "0.0.0.0", 10000)
+    }
+    Thread.sleep(200)
+    assertEquals(Bytes.fromHexString("deadbeef"), ref.get().body)
+    client1.stop()
+    client2.stop()
+  }
+
+  @Test
+  fun testTwoEndpoints(@VertxInstance vertx: Vertx) {
+    val ref = AtomicReference<Message>()
+    val ref2 = AtomicReference<Message>()
+    val client1 = HobbitsTransport(vertx)
+    val client2 = HobbitsTransport(vertx)
+    runBlocking {
+      client1.exceptionHandler { it.printStackTrace() }
+      client2.exceptionHandler { it.printStackTrace() }
+      client1.createWSEndpoint("foo", port = 11000, handler = ref::set)
+      client1.createWSEndpoint("bar", port = 11001, handler = ref2::set)
+      client1.start()
+      client2.start()
+      client2.sendMessage(
+        Message(command = "WHO", body = Bytes.fromHexString("deadbeef"), 
headers = Bytes.random(16)),
+        Transport.WS,
+        "0.0.0.0",
+        11000
+      )
+      client2.sendMessage(
+        Message(command = "WHO", body = Bytes.fromHexString("deadbeef"), 
headers = Bytes.random(16)),
+        Transport.WS,
+        "0.0.0.0",
+        11001
+      )
+    }
+    Thread.sleep(200)
+    assertEquals(Bytes.fromHexString("deadbeef"), ref.get().body)
+    assertEquals(Bytes.fromHexString("deadbeef"), ref2.get().body)
+    client1.stop()
+    client2.stop()
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@tuweni.apache.org
For additional commands, e-mail: commits-h...@tuweni.apache.org

Reply via email to