jiwen624 commented on code in PR #46604:
URL: https://github.com/apache/spark/pull/46604#discussion_r1612041718
##########
core/src/main/scala/org/apache/spark/metrics/sink/StatsdReporter.scala:
##########
@@ -161,3 +162,77 @@ private[spark] class StatsdReporter(
}
}
+private trait DataSender {
+ def start(): DataSender
+ def localHostPort: (String, Int)
+ def send(data: String): Unit
+ def close(): Unit
+}
+
+private class UDPDataSender(
+ val host: String,
+ val port: Int) extends DataSender {
+ private val socket = new DatagramSocket
+ private val address = new InetSocketAddress(host, port)
+
+ override def start(): DataSender = {
+ this
+ }
+
+ override def localHostPort: (String, Int) = {
+ (Try(socket.getLocalAddress).map(_.getHostAddress).getOrElse(null),
socket.getLocalPort)
+ }
+
+ override def send(data: String): Unit = {
+ val bytes = data.getBytes(UTF_8)
+ val packet = new DatagramPacket(bytes, bytes.length, address)
+ socket.send(packet)
+ }
+
+ override def close(): Unit = {
+ socket.close()
+ }
+}
+
+private class TCPDataSender(
+ val host: String,
+ val port: Int,
+ val connTimeoutMs: Int) extends DataSender {
+ private val socket = new Socket
+ private var dataOut: Option[DataOutputStream] = None
+
+ override def start(): DataSender = {
+ socket.connect(new InetSocketAddress(host, port), connTimeoutMs)
+ dataOut = Some(new DataOutputStream(socket.getOutputStream))
+ this
+ }
+
+ override def localHostPort: (String, Int) = {
+ (Try(socket.getLocalAddress).map(_.getHostAddress).getOrElse(null),
socket.getLocalPort)
+ }
+
+ override def send(data: String): Unit = {
+ val bytes = (data + "\n").getBytes(UTF_8)
+ dataOut.foreach(_.write(bytes))
+ }
+
+ override def close(): Unit = {
+ dataOut.foreach(_.close())
+ }
+}
+
+private[spark] object DataSenderType extends Enumeration {
+ type DataSenderType = Value
+ val UDP, TCP = Value
+}
+
+private object DataSender {
+ def get(host: String, port: Int, protocol: DataSenderType, connTimeoutMs:
Int): DataSender = {
+ val ds = protocol match {
+ case TCP => new TCPDataSender(host, port, connTimeoutMs)
Review Comment:
I'm not using TCP long connections here. The consideration is:
- For metrics we do not heavily use the connection, it's usually a
single-shot of a few metrics in a period of a few seconds, 10s by default. It's
very costly even overwhelming for the Statsd server side to keep a large number
of long-lived connections, given that in the production environments we usually
have a ton of Spark applications/drivers/executors.
- The drawback to consider is for a short TCP connection, the port
cannot be reused for a short while due to TCP_TIMEWAIT, which is a tradeoff and
it somewhat depends on what the `interval` is.
- Simplicity. To maintain a long connection, we may have to handle a lot
more failure scenarios, e.g., detect & reconnect due to a server-side glitch or
transient network issues, and possibly a retry mechanism, and perhaps a retry
with backoff and jitter, etc. Not sure if this complexity is expected though
given that the current UDP mode is just fire-and-forget.
So I don't see an ideal solution for all scenarios, please feel free to
discuss if you have different opinions.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]