jiwen624 commented on code in PR #46604:
URL: https://github.com/apache/spark/pull/46604#discussion_r1612041718


##########
core/src/main/scala/org/apache/spark/metrics/sink/StatsdReporter.scala:
##########
@@ -161,3 +162,77 @@ private[spark] class StatsdReporter(
     }
 }
 
+private trait DataSender {
+  def start(): DataSender
+  def localHostPort: (String, Int)
+  def send(data: String): Unit
+  def close(): Unit
+}
+
+private class UDPDataSender(
+  val host: String,
+  val port: Int) extends DataSender {
+  private val socket = new DatagramSocket
+  private val address = new InetSocketAddress(host, port)
+
+  override def start(): DataSender = {
+    this
+  }
+
+  override def localHostPort: (String, Int) = {
+    (Try(socket.getLocalAddress).map(_.getHostAddress).getOrElse(null), 
socket.getLocalPort)
+  }
+
+  override def send(data: String): Unit = {
+    val bytes = data.getBytes(UTF_8)
+    val packet = new DatagramPacket(bytes, bytes.length, address)
+    socket.send(packet)
+  }
+
+  override def close(): Unit = {
+    socket.close()
+  }
+}
+
+private class TCPDataSender(
+  val host: String,
+  val port: Int,
+  val connTimeoutMs: Int) extends DataSender {
+  private val socket = new Socket
+  private var dataOut: Option[DataOutputStream] = None
+
+  override def start(): DataSender = {
+    socket.connect(new InetSocketAddress(host, port), connTimeoutMs)
+    dataOut = Some(new DataOutputStream(socket.getOutputStream))
+    this
+  }
+
+  override def localHostPort: (String, Int) = {
+    (Try(socket.getLocalAddress).map(_.getHostAddress).getOrElse(null), 
socket.getLocalPort)
+  }
+
+  override def send(data: String): Unit = {
+    val bytes = (data + "\n").getBytes(UTF_8)
+    dataOut.foreach(_.write(bytes))
+  }
+
+  override def close(): Unit = {
+    dataOut.foreach(_.close())
+  }
+}
+
+private[spark] object DataSenderType extends Enumeration {
+  type DataSenderType = Value
+  val UDP, TCP = Value
+}
+
+private object DataSender {
+  def get(host: String, port: Int, protocol: DataSenderType, connTimeoutMs: 
Int): DataSender = {
+    val ds = protocol match {
+      case TCP => new TCPDataSender(host, port, connTimeoutMs)

Review Comment:
   I'm not using TCP long connections here. The consideration is:
   - For metrics we do not heavily use the connection, it's usually a 
single-shot of a few metrics in a period of a few seconds, 10s by default. It's 
very costly even overwhelming for the Statsd server side to keep a large number 
of long-lived connections, given that in the production environments we usually 
have a ton of Spark applications/drivers/executors. 
       - The drawback to consider is for a short TCP connection, the port 
cannot be reused for a short while due to TCP_TIMEWAIT, which is a tradeoff and 
it somewhat depends on what the `interval` is.
   - Simplicity. To maintain a long connection, we may have to handle a lot 
more failure scenarios, e.g., detect & reconnect due to a server-side glitch or 
transient network issues, and possibly a retry mechanism, and perhaps a retry 
with backoff and jitter, etc. Not sure if this complexity is expected though 
given that the current UDP mode is just fire-and-forget.
   
   So I don't see an ideal solution for all scenarios, please feel free to 
discuss if you have different opinions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to