This is an automated email from the ASF dual-hosted git repository. toulmean pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git
The following commit(s) were added to refs/heads/master by this push: new 7191c74 Hobbits implementation for TCP/HTTP/UDP/WS. Still WIP 7191c74 is described below commit 7191c7498d3da3c15e21b57866dc7f8cff0595d4 Author: Antoine Toulme <anto...@lunar-ocean.com> AuthorDate: Fri Jun 7 13:31:14 2019 -0700 Hobbits implementation for TCP/HTTP/UDP/WS. Still WIP --- .../org/apache/tuweni/hobbits/HobbitsTransport.kt | 178 ++++++++++++--- .../kotlin/org/apache/tuweni/hobbits/Message.kt | 10 + .../apache/tuweni/hobbits/HobbitsTransportTest.kt | 2 +- .../org/apache/tuweni/hobbits/InteractionTest.kt | 246 +++++++++++++++++++++ 4 files changed, 409 insertions(+), 27 deletions(-) diff --git a/hobbits/src/main/kotlin/org/apache/tuweni/hobbits/HobbitsTransport.kt b/hobbits/src/main/kotlin/org/apache/tuweni/hobbits/HobbitsTransport.kt index 4c1d16c..8f3ddf0 100644 --- a/hobbits/src/main/kotlin/org/apache/tuweni/hobbits/HobbitsTransport.kt +++ b/hobbits/src/main/kotlin/org/apache/tuweni/hobbits/HobbitsTransport.kt @@ -26,6 +26,7 @@ import io.vertx.core.net.NetClient import io.vertx.core.net.NetServer import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers +import org.apache.tuweni.bytes.Bytes import org.apache.tuweni.concurrent.AsyncCompletion import org.apache.tuweni.concurrent.coroutines.await import java.lang.IllegalArgumentException @@ -38,7 +39,7 @@ import kotlin.coroutines.CoroutineContext /** * Hobbits is a peer-to-peer transport stack specified at https://www.github.com/deltap2p/hobbits. * - * This class works as a transport mechanism that can leverage a variety of network transport + * This class works as a transport mech anism that can leverage a variety of network transport * mechanisms, such as TCP, HTTP, UDP and Web sockets. * * It can be used to contact other Hobbits endpoints, or to expose endpoints to the network. @@ -56,27 +57,42 @@ class HobbitsTransport( private val udpEndpoints = mutableMapOf<String, Endpoint>() private val wsEndpoints = mutableMapOf<String, Endpoint>() + private var exceptionHandler: ((Throwable) -> Unit)? = { } + private var httpClient: HttpClient? = null private var tcpClient: NetClient? = null private var udpClient: DatagramSocket? = null - private var httpServer: HttpServer? = null - private var tcpServer: NetServer? = null + private val httpServers = mutableMapOf<String, HttpServer>() + private val tcpServers = mutableMapOf<String, NetServer>() + private val udpServers = mutableMapOf<String, DatagramSocket>() + private val wsServers = mutableMapOf<String, HttpServer>() + + /** + * Sets an exception handler that will be called whenever an exception occurs during transport. + */ + fun exceptionHandler(handler: (Throwable) -> Unit) { + exceptionHandler = handler + } /** * Creates a new endpoint over http. * @param networkInterface the network interface to bind the endpoint to * @param port the port to serve traffic from + * @param requestURI the request URI path to match * @param tls whether the endpoint should be secured using TLS + * @param handler function called when a message is received */ fun createHTTPEndpoint( id: String = "default", networkInterface: String = "0.0.0.0", port: Int = 9337, - tls: Boolean = false + requestURI: String? = null, + tls: Boolean = false, + handler: (Message) -> Unit ) { checkNotStarted() - httpEndpoints[id] = Endpoint(networkInterface, port, tls) + httpEndpoints[id] = Endpoint(networkInterface, port, requestURI, tls, handler) } /** @@ -84,15 +100,17 @@ class HobbitsTransport( * @param networkInterface the network interface to bind the endpoint to * @param port the port to serve traffic from * @param tls whether the endpoint should be secured using TLS + * @param handler function called when a message is received */ fun createTCPEndpoint( id: String = "default", networkInterface: String = "0.0.0.0", port: Int = 9237, - tls: Boolean = false + tls: Boolean = false, + handler: (Message) -> Unit ) { checkNotStarted() - tcpEndpoints[id] = Endpoint(networkInterface, port, tls) + tcpEndpoints[id] = Endpoint(networkInterface, port, null, tls, handler) } /** @@ -100,43 +118,54 @@ class HobbitsTransport( * @param networkInterface the network interface to bind the endpoint to * @param port the port to serve traffic from * @param tls whether the endpoint should be secured using TLS + * @param handler function called when a message is received */ - fun createUDPEndpoint(id: String = "default", networkInterface: String = "0.0.0.0", port: Int = 9137) { + fun createUDPEndpoint( + id: String = "default", + networkInterface: String = "0.0.0.0", + port: Int = 9137, + handler: (Message) -> Unit + ) { checkNotStarted() - udpEndpoints[id] = Endpoint(networkInterface, port, false) + udpEndpoints[id] = Endpoint(networkInterface, port, null, false, handler) } /** * Creates a new endpoint over websocket connections. * @param networkInterface the network interface to bind the endpoint to * @param port the port to serve traffic from + * @param requestURI the request URI path to match * @param tls whether the endpoint should be secured using TLS + * @param handler function called when a message is received */ fun createWSEndpoint( id: String = "default", networkInterface: String = "0.0.0.0", port: Int = 9037, - tls: Boolean = false + requestURI: String? = null, + tls: Boolean = false, + handler: (Message) -> Unit ) { checkNotStarted() - wsEndpoints[id] = Endpoint(networkInterface, port, tls) + wsEndpoints[id] = Endpoint(networkInterface, port, requestURI, tls, handler) } /** * Sends a message using the transport specified. * */ - suspend fun sendMessage(message: Message, transport: Transport, host: String, port: Int) { + suspend fun sendMessage(message: Message, transport: Transport, host: String, port: Int, requestURI: String = "") { checkStarted() val completion = AsyncCompletion.incomplete() when (transport) { Transport.HTTP -> { @Suppress("DEPRECATION") - val req = httpClient!!.request(HttpMethod.POST, port, host, "/").handler { + val req = httpClient!!.request(HttpMethod.POST, port, host, requestURI) + .exceptionHandler(exceptionHandler).handler { if (it.statusCode() == 200) { completion.complete() } else { - completion.completeExceptionally(RuntimeException()) + completion.completeExceptionally(RuntimeException("${it.statusCode()}")) } } req.end(Buffer.buffer(message.toBytes().toArrayUnsafe())) @@ -146,16 +175,29 @@ class HobbitsTransport( if (res.failed()) { completion.completeExceptionally(res.cause()) } else { - res.result().end(Buffer.buffer(message.toBytes().toArrayUnsafe())) + res.result().exceptionHandler(exceptionHandler).end(Buffer.buffer(message.toBytes().toArrayUnsafe())) completion.complete() } } } Transport.UDP -> { - TODO() + udpClient!!.send(Buffer.buffer(message.toBytes().toArrayUnsafe()), port, host) { handler -> + if (handler.failed()) { + completion.completeExceptionally(handler.cause()) + } else { + completion.complete() + } + } } Transport.WS -> { - TODO() + httpClient!!.websocket(port, host, requestURI, { handler -> + handler.exceptionHandler(exceptionHandler) + .writeBinaryMessage(Buffer.buffer(message.toBytes().toArrayUnsafe())).end() + completion.complete() + }, + { exception -> + completion.completeExceptionally(exception) + }) } } completion.await() @@ -201,15 +243,45 @@ class HobbitsTransport( if (started.compareAndSet(false, true)) { httpClient = vertx.createHttpClient() tcpClient = vertx.createNetClient() - udpClient = vertx.createDatagramSocket() - - httpServer = vertx.createHttpServer() - tcpServer = vertx.createNetServer() + udpClient = vertx.createDatagramSocket().exceptionHandler(exceptionHandler) val completions = mutableListOf<AsyncCompletion>() - for (endpoint in httpEndpoints.values) { + for ((id, endpoint) in httpEndpoints) { + val completion = AsyncCompletion.incomplete() + val httpServer = vertx.createHttpServer() + httpServers[id] = httpServer + + httpServer.requestHandler { + if (endpoint.requestURI == null || it.path().startsWith(endpoint.requestURI)) { + it.bodyHandler { endpoint.handler(Message.readMessage(Bytes.wrapBuffer(it))!!) } + it.response().statusCode = 200 + it.response().end() + } else { + it.response().statusCode = 404 + it.response().end() + } + }.listen(endpoint.port, endpoint.networkInterface) { + if (it.failed()) { + completion.completeExceptionally(it.cause()) + } else { + completion.complete() + } + } + completions.add(completion) + } + for ((id, endpoint) in tcpEndpoints) { val completion = AsyncCompletion.incomplete() - httpServer!!.listen(endpoint.port, endpoint.networkInterface) { + val tcpServer = vertx.createNetServer() + tcpServers[id] = tcpServer + tcpServer.connectHandler { connectHandler -> connectHandler.handler { buffer -> + val message = Message.readMessage(Bytes.wrapBuffer(buffer)) + if (message == null) { + TODO("Buffer not implemented yet") + } else { + endpoint.handler(message) + } + } + }.listen(endpoint.port, endpoint.networkInterface) { if (it.failed()) { completion.completeExceptionally(it.cause()) } else { @@ -218,9 +290,48 @@ class HobbitsTransport( } completions.add(completion) } - for (endpoint in tcpEndpoints.values) { + for ((id, endpoint) in udpEndpoints) { val completion = AsyncCompletion.incomplete() - tcpServer!!.listen(endpoint.port, endpoint.networkInterface) { + + val udpServer = vertx.createDatagramSocket() + udpServers[id] = udpServer + + udpServer.handler { packet -> + val message = Message.readMessage(Bytes.wrapBuffer(packet.data())) + if (message == null) { + TODO("Buffer not implemented yet") + } else { + endpoint.handler(message) + } + }.listen(endpoint.port, endpoint.networkInterface) { + if (it.failed()) { + completion.completeExceptionally(it.cause()) + } else { + completion.complete() + } + } + completions.add(completion) + } + for ((id, endpoint) in wsEndpoints) { + val completion = AsyncCompletion.incomplete() + val httpServer = vertx.createHttpServer() + wsServers[id] = httpServer + + httpServer.websocketHandler { + if (endpoint.requestURI == null || it.path().startsWith(endpoint.requestURI)) { + it.accept() + + it.binaryMessageHandler { buffer -> + try { + endpoint.handler(Message.readMessage(Bytes.wrapBuffer(buffer))!!) + } finally { + it.end() + } + } + } else { + it.reject() + } + }.listen(endpoint.port, endpoint.networkInterface) { if (it.failed()) { completion.completeExceptionally(it.cause()) } else { @@ -238,6 +349,15 @@ class HobbitsTransport( httpClient!!.close() tcpClient!!.close() udpClient!!.close() + for (server in httpServers.values) { + server.close() + } + for (server in tcpServers.values) { + server.close() + } + for (server in udpServers.values) { + server.close() + } } } @@ -254,7 +374,13 @@ class HobbitsTransport( } } -internal data class Endpoint(val networkInterface: String, val port: Int, val tls: Boolean) +internal data class Endpoint( + val networkInterface: String, + val port: Int, + val requestURI: String?, + val tls: Boolean, + val handler: (Message) -> Unit +) enum class Transport() { HTTP, diff --git a/hobbits/src/main/kotlin/org/apache/tuweni/hobbits/Message.kt b/hobbits/src/main/kotlin/org/apache/tuweni/hobbits/Message.kt index 14c2cc3..259da84 100644 --- a/hobbits/src/main/kotlin/org/apache/tuweni/hobbits/Message.kt +++ b/hobbits/src/main/kotlin/org/apache/tuweni/hobbits/Message.kt @@ -85,9 +85,19 @@ class Message( /** * Writes a message into bytes. + * @return the bytes of the message */ fun toBytes(): Bytes { val requestLine = "$protocol $version $command ${headers.size()} ${body.size()}\n" return Bytes.concatenate(Bytes.wrap(requestLine.toByteArray()), headers, body) } + + /** + * Provides the size of the message + * @return the size of the message + */ + fun size(): Int { + return protocol.length + 5 + version.length + command.length + headers.size().toString().length + + body.size().toString().length + headers.size() + body.size() + } } diff --git a/hobbits/src/test/kotlin/org/apache/tuweni/hobbits/HobbitsTransportTest.kt b/hobbits/src/test/kotlin/org/apache/tuweni/hobbits/HobbitsTransportTest.kt index 8ddac68..2984dd0 100644 --- a/hobbits/src/test/kotlin/org/apache/tuweni/hobbits/HobbitsTransportTest.kt +++ b/hobbits/src/test/kotlin/org/apache/tuweni/hobbits/HobbitsTransportTest.kt @@ -57,7 +57,7 @@ class HobbitsTransportTest { val server = HobbitsTransport(vertx) server.start() val exception: IllegalStateException = assertThrows { - server.createHTTPEndpoint() + server.createHTTPEndpoint(handler = {}) } assertEquals("Server already started", exception.message) } diff --git a/hobbits/src/test/kotlin/org/apache/tuweni/hobbits/InteractionTest.kt b/hobbits/src/test/kotlin/org/apache/tuweni/hobbits/InteractionTest.kt new file mode 100644 index 0000000..49eeb09 --- /dev/null +++ b/hobbits/src/test/kotlin/org/apache/tuweni/hobbits/InteractionTest.kt @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tuweni.hobbits + +import io.vertx.core.Vertx +import kotlinx.coroutines.runBlocking +import org.apache.tuweni.bytes.Bytes +import org.apache.tuweni.junit.VertxExtension +import org.apache.tuweni.junit.VertxInstance +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtendWith +import java.util.concurrent.atomic.AtomicReference + +@ExtendWith(VertxExtension::class) +class TCPPersistentTest { + + @Test + fun testTwoTCPConnections(@VertxInstance vertx: Vertx) { + val ref = AtomicReference<Message>() + val client1 = HobbitsTransport(vertx) + val client2 = HobbitsTransport(vertx) + runBlocking { + client1.createTCPEndpoint("foo", port = 10000, handler = ref::set) + client1.start() + client2.start() + client2.sendMessage( + Message(command = "WHO", body = Bytes.fromHexString("deadbeef"), headers = Bytes.random(16)), + Transport.TCP, + "0.0.0.0", + 10000 + ) + } + Thread.sleep(200) + assertEquals(Bytes.fromHexString("deadbeef"), ref.get().body) + client1.stop() + client2.stop() + } + + @Test + fun testTwoEndpoints(@VertxInstance vertx: Vertx) { + val ref = AtomicReference<Message>() + val ref2 = AtomicReference<Message>() + val client1 = HobbitsTransport(vertx) + val client2 = HobbitsTransport(vertx) + runBlocking { + client1.createTCPEndpoint("foo", port = 10000, handler = ref::set) + client1.createTCPEndpoint("bar", port = 10001, handler = ref2::set) + client1.start() + client2.start() + client2.sendMessage( + Message(command = "WHO", body = Bytes.fromHexString("deadbeef"), headers = Bytes.random(16)), + Transport.TCP, + "0.0.0.0", + 10000 + ) + client2.sendMessage( + Message(command = "WHO", body = Bytes.fromHexString("deadbeef"), headers = Bytes.random(16)), + Transport.TCP, + "0.0.0.0", + 10001 + ) + } + Thread.sleep(200) + assertEquals(Bytes.fromHexString("deadbeef"), ref.get().body) + assertEquals(Bytes.fromHexString("deadbeef"), ref2.get().body) + client1.stop() + client2.stop() + } +} + +@ExtendWith(VertxExtension::class) +class HTTPTest { + @Test + fun testTwoHTTPConnections(@VertxInstance vertx: Vertx) { + val ref = AtomicReference<Message>() + val client1 = HobbitsTransport(vertx) + val client2 = HobbitsTransport(vertx) + + runBlocking { + client1.createHTTPEndpoint("foo", port = 10000, handler = ref::set) + client1.start() + client2.start() + client2.sendMessage(Message(command = "WHO", body = Bytes.fromHexString("deadbeef"), + headers = Bytes.random(16)), Transport.HTTP, "0.0.0.0", 10000) + } + Thread.sleep(200) + assertEquals(Bytes.fromHexString("deadbeef"), ref.get().body) + client1.stop() + client2.stop() + } + + @Test + fun testTwoEndpoints(@VertxInstance vertx: Vertx) { + val ref = AtomicReference<Message>() + val ref2 = AtomicReference<Message>() + val client1 = HobbitsTransport(vertx) + val client2 = HobbitsTransport(vertx) + runBlocking { + client1.createHTTPEndpoint("foo", port = 10000, handler = ref::set) + client1.createHTTPEndpoint("bar", port = 10001, handler = ref2::set) + client1.start() + client2.start() + client2.sendMessage( + Message(command = "WHO", body = Bytes.fromHexString("deadbeef"), headers = Bytes.random(16)), + Transport.HTTP, + "0.0.0.0", + 10000 + ) + client2.sendMessage( + Message(command = "WHO", body = Bytes.fromHexString("deadbeef"), headers = Bytes.random(16)), + Transport.HTTP, + "0.0.0.0", + 10001 + ) + } + Thread.sleep(200) + assertEquals(Bytes.fromHexString("deadbeef"), ref.get().body) + assertEquals(Bytes.fromHexString("deadbeef"), ref2.get().body) + client1.stop() + client2.stop() + } +} + +@ExtendWith(VertxExtension::class) +class UDPTest { + @Test + fun testTwoUDPConnections(@VertxInstance vertx: Vertx) { + val ref = AtomicReference<Message>() + val client1 = HobbitsTransport(vertx) + val client2 = HobbitsTransport(vertx) + + runBlocking { + client1.createUDPEndpoint("foo", port = 10000, handler = ref::set) + client1.start() + client2.start() + client2.sendMessage(Message(command = "WHO", body = Bytes.fromHexString("deadbeef"), + headers = Bytes.random(16)), Transport.UDP, "0.0.0.0", 10000) + } + Thread.sleep(200) + assertEquals(Bytes.fromHexString("deadbeef"), ref.get().body) + client1.stop() + client2.stop() + } + + @Test + fun testTwoEndpoints(@VertxInstance vertx: Vertx) { + val ref = AtomicReference<Message>() + val ref2 = AtomicReference<Message>() + val client1 = HobbitsTransport(vertx) + val client2 = HobbitsTransport(vertx) + runBlocking { + client1.createUDPEndpoint("foo", port = 10000, handler = ref::set) + client1.createUDPEndpoint("bar", port = 10001, handler = ref2::set) + client1.start() + client2.start() + client2.sendMessage( + Message(command = "WHO", body = Bytes.fromHexString("deadbeef"), headers = Bytes.random(16)), + Transport.UDP, + "0.0.0.0", + 10000 + ) + client2.sendMessage( + Message(command = "WHO", body = Bytes.fromHexString("deadbeef"), headers = Bytes.random(16)), + Transport.UDP, + "0.0.0.0", + 10001 + ) + } + Thread.sleep(200) + assertEquals(Bytes.fromHexString("deadbeef"), ref.get().body) + assertEquals(Bytes.fromHexString("deadbeef"), ref2.get().body) + client1.stop() + client2.stop() + } +} + +@ExtendWith(VertxExtension::class) +class WebSocketTest { + @Test + fun testTwoWSConnections(@VertxInstance vertx: Vertx) { + vertx.exceptionHandler { it.printStackTrace() } + val ref = AtomicReference<Message>() + val client1 = HobbitsTransport(vertx) + val client2 = HobbitsTransport(vertx) + + runBlocking { + client1.createWSEndpoint("foo", port = 10000, handler = ref::set) + client1.start() + client2.start() + client2.sendMessage(Message(command = "WHO", body = Bytes.fromHexString("deadbeef"), + headers = Bytes.random(16)), Transport.WS, "0.0.0.0", 10000) + } + Thread.sleep(200) + assertEquals(Bytes.fromHexString("deadbeef"), ref.get().body) + client1.stop() + client2.stop() + } + + @Test + fun testTwoEndpoints(@VertxInstance vertx: Vertx) { + val ref = AtomicReference<Message>() + val ref2 = AtomicReference<Message>() + val client1 = HobbitsTransport(vertx) + val client2 = HobbitsTransport(vertx) + runBlocking { + client1.exceptionHandler { it.printStackTrace() } + client2.exceptionHandler { it.printStackTrace() } + client1.createWSEndpoint("foo", port = 11000, handler = ref::set) + client1.createWSEndpoint("bar", port = 11001, handler = ref2::set) + client1.start() + client2.start() + client2.sendMessage( + Message(command = "WHO", body = Bytes.fromHexString("deadbeef"), headers = Bytes.random(16)), + Transport.WS, + "0.0.0.0", + 11000 + ) + client2.sendMessage( + Message(command = "WHO", body = Bytes.fromHexString("deadbeef"), headers = Bytes.random(16)), + Transport.WS, + "0.0.0.0", + 11001 + ) + } + Thread.sleep(200) + assertEquals(Bytes.fromHexString("deadbeef"), ref.get().body) + assertEquals(Bytes.fromHexString("deadbeef"), ref2.get().body) + client1.stop() + client2.stop() + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@tuweni.apache.org For additional commands, e-mail: commits-h...@tuweni.apache.org