jiwen624 commented on code in PR #46604:
URL: https://github.com/apache/spark/pull/46604#discussion_r1612041718


##########
core/src/main/scala/org/apache/spark/metrics/sink/StatsdReporter.scala:
##########
@@ -161,3 +162,77 @@ private[spark] class StatsdReporter(
     }
 }
 
+private trait DataSender {
+  def start(): DataSender
+  def localHostPort: (String, Int)
+  def send(data: String): Unit
+  def close(): Unit
+}
+
+private class UDPDataSender(
+  val host: String,
+  val port: Int) extends DataSender {
+  private val socket = new DatagramSocket
+  private val address = new InetSocketAddress(host, port)
+
+  override def start(): DataSender = {
+    this
+  }
+
+  override def localHostPort: (String, Int) = {
+    (Try(socket.getLocalAddress).map(_.getHostAddress).getOrElse(null), 
socket.getLocalPort)
+  }
+
+  override def send(data: String): Unit = {
+    val bytes = data.getBytes(UTF_8)
+    val packet = new DatagramPacket(bytes, bytes.length, address)
+    socket.send(packet)
+  }
+
+  override def close(): Unit = {
+    socket.close()
+  }
+}
+
+private class TCPDataSender(
+  val host: String,
+  val port: Int,
+  val connTimeoutMs: Int) extends DataSender {
+  private val socket = new Socket
+  private var dataOut: Option[DataOutputStream] = None
+
+  override def start(): DataSender = {
+    socket.connect(new InetSocketAddress(host, port), connTimeoutMs)
+    dataOut = Some(new DataOutputStream(socket.getOutputStream))
+    this
+  }
+
+  override def localHostPort: (String, Int) = {
+    (Try(socket.getLocalAddress).map(_.getHostAddress).getOrElse(null), 
socket.getLocalPort)
+  }
+
+  override def send(data: String): Unit = {
+    val bytes = (data + "\n").getBytes(UTF_8)
+    dataOut.foreach(_.write(bytes))
+  }
+
+  override def close(): Unit = {
+    dataOut.foreach(_.close())
+  }
+}
+
+private[spark] object DataSenderType extends Enumeration {
+  type DataSenderType = Value
+  val UDP, TCP = Value
+}
+
+private object DataSender {
+  def get(host: String, port: Int, protocol: DataSenderType, connTimeoutMs: 
Int): DataSender = {
+    val ds = protocol match {
+      case TCP => new TCPDataSender(host, port, connTimeoutMs)

Review Comment:
   I'm not using TCP long connections here. The consideration is:
   - For metrics we do not heavily use the connection, it's usually a 
single-shot of a few metrics in a period of a few seconds, 10s by default. It's 
very costly even overwhelming for the Statsd server side to keep a large number 
of long-lived connections, given that in the production environments we usually 
have a ton of Spark applications/drivers/executors. 
       - The drawback to consider is for a short TCP connection, the port 
cannot be reused for a short while due to TCP_TIMEWAIT, which is a tradeoff and 
it somewhat depends on what the `interval` is.
       - Also there is a cost of initializing and destroying a TCP connection.
   - Simplicity. To maintain a long connection, we may have to handle a lot 
more failure scenarios, e.g., detect & reconnect due to a server-side glitch or 
transient network issues, and possibly a retry mechanism, and perhaps a retry 
with backoff and jitter, etc. Not sure if this complexity is expected though 
given that the current UDP mode is just fire-and-forget.
   - Other considerations:
       - Short-lived TCP connection is better for load-balancing & automatic 
failover, etc when, let's say, the Statsd service is hosted by multiple hosts 
behind a load balancer (e.g., a DNS, pods behind a Service in Kubernetes). This 
can be another long story itself to talk about.
   
   So I don't see an ideal solution for all scenarios (this is probably why we 
are offering both UDP and TCP modes), we might provide both options 
(short-lived and long-lived) for TCP mode in case it's worth it. 
   
   Please feel free to discuss if you have different opinions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to