jiwen624 commented on code in PR #46604:
URL: https://github.com/apache/spark/pull/46604#discussion_r1612041718
##########
core/src/main/scala/org/apache/spark/metrics/sink/StatsdReporter.scala:
##########
@@ -161,3 +162,77 @@ private[spark] class StatsdReporter(
}
}
+private trait DataSender {
+ def start(): DataSender
+ def localHostPort: (String, Int)
+ def send(data: String): Unit
+ def close(): Unit
+}
+
+private class UDPDataSender(
+ val host: String,
+ val port: Int) extends DataSender {
+ private val socket = new DatagramSocket
+ private val address = new InetSocketAddress(host, port)
+
+ override def start(): DataSender = {
+ this
+ }
+
+ override def localHostPort: (String, Int) = {
+ (Try(socket.getLocalAddress).map(_.getHostAddress).getOrElse(null),
socket.getLocalPort)
+ }
+
+ override def send(data: String): Unit = {
+ val bytes = data.getBytes(UTF_8)
+ val packet = new DatagramPacket(bytes, bytes.length, address)
+ socket.send(packet)
+ }
+
+ override def close(): Unit = {
+ socket.close()
+ }
+}
+
+private class TCPDataSender(
+ val host: String,
+ val port: Int,
+ val connTimeoutMs: Int) extends DataSender {
+ private val socket = new Socket
+ private var dataOut: Option[DataOutputStream] = None
+
+ override def start(): DataSender = {
+ socket.connect(new InetSocketAddress(host, port), connTimeoutMs)
+ dataOut = Some(new DataOutputStream(socket.getOutputStream))
+ this
+ }
+
+ override def localHostPort: (String, Int) = {
+ (Try(socket.getLocalAddress).map(_.getHostAddress).getOrElse(null),
socket.getLocalPort)
+ }
+
+ override def send(data: String): Unit = {
+ val bytes = (data + "\n").getBytes(UTF_8)
+ dataOut.foreach(_.write(bytes))
+ }
+
+ override def close(): Unit = {
+ dataOut.foreach(_.close())
+ }
+}
+
+private[spark] object DataSenderType extends Enumeration {
+ type DataSenderType = Value
+ val UDP, TCP = Value
+}
+
+private object DataSender {
+ def get(host: String, port: Int, protocol: DataSenderType, connTimeoutMs:
Int): DataSender = {
+ val ds = protocol match {
+ case TCP => new TCPDataSender(host, port, connTimeoutMs)
Review Comment:
I'm not using TCP long connections here. The consideration is:
- For metrics we do not heavily use the connection, it's usually a
single-shot of a few metrics in a period of a few seconds, 10s by default. It's
very costly even overwhelming for the Statsd server side to keep a large number
of long-lived connections, given that in the production environments we usually
have a ton of Spark applications/drivers/executors.
- The drawback to consider is for a short TCP connection, the port
cannot be reused for a short while due to TCP_TIMEWAIT, which is a tradeoff and
it somewhat depends on what the `interval` is.
- Also there is a cost of initializing and destroying a TCP connection.
- Simplicity. To maintain a long connection, we may have to handle a lot
more failure scenarios, e.g., detect & reconnect due to a server-side glitch or
transient network issues, and possibly a retry mechanism, and perhaps a retry
with backoff and jitter, etc. Not sure if this complexity is expected though
given that the current UDP mode is just fire-and-forget.
- Other considerations:
- Short-lived TCP connection is better for load-balancing & automatic
failover, etc when, let's say, the Statsd service is hosted by multiple hosts
behind a load balancer. This can be another long story itself to talk about but
may be too far away from this topic.
So I don't see an ideal solution for all scenarios, and this is probably why
we are offering both UDP and TCP modes. Please feel free to discuss if you have
different opinions.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]