wenxuanguan commented on a change in pull request #25618:
[SPARK-28908][SS]Implement Kafka EOS sink for Structured Streaming
URL: https://github.com/apache/spark/pull/25618#discussion_r319332869
##########
File path:
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataWriter.scala
##########
@@ -18,16 +18,160 @@
package org.apache.spark.sql.kafka010
import java.{util => ju}
+import java.util.concurrent.atomic.AtomicInteger
+
+import com.google.common.cache._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.util.Utils
+
+/**
+ * A [[WriterCommitMessage]] for Kafka commit message.
+ * @param transactionalId Unique transactionalId for each producer.
+ * @param epoch Transactional epoch.
+ * @param producerId Transactional producerId for producer, got when init
transaction.
+ */
+private[kafka010] case class ProducerTransactionMetaData(
+ transactionalId: String,
+ epoch: Short,
+ producerId: Long)
+ extends WriterCommitMessage
+
+/**
+ * Emtpy commit message for resume transaction.
+ */
+private case object EmptyCommitMessage extends WriterCommitMessage
+
+private[kafka010] case object ProducerTransactionMetaData {
+ val VERSION = 1
+
+ def toTransactionId(
+ executorId: String,
+ taskIndex: String,
+ transactionalIdSuffix: String): String = {
+ toTransactionId(toProducerIdentity(executorId, taskIndex),
transactionalIdSuffix)
+ }
+
+ def toTransactionId(producerIdentity: String, transactionalIdSuffix:
String): String = {
+ s"$producerIdentity||$transactionalIdSuffix"
+ }
+
+ def toTransactionalIdSuffix(transactionalId: String): String = {
+ transactionalId.split("\\|\\|", 2)(1)
+ }
+
+ def toProducerIdentity(transactionalId: String): String = {
+ transactionalId.split("\\|\\|", 2)(0)
+ }
+
+ def toExecutorId(transactionalId: String): String = {
+ val producerIdentity = toProducerIdentity(transactionalId)
+ producerIdentity.split("-", 2)(0)
+ }
+
+ def toTaskIndex(transactionalId: String): String = {
+ val producerIdentity = toProducerIdentity(transactionalId)
+ producerIdentity.split("-", 2)(1)
+ }
+
+ def toProducerIdentity(executorId: String, taskIndex: String): String = {
+ s"$executorId-$taskIndex"
+ }
+}
+
+/**
+ * A [[DataWriter]] for Kafka transactional writing. One data writer will be
created
+ * in each partition to process incoming rows.
+ *
+ * @param targetTopic The topic that this data writer is targeting. If None,
topic will be inferred
+ * from a `topic` field in the incoming data.
+ * @param producerParams Parameters to use for the Kafka producer.
+ * @param inputSchema The attributes in the input data.
+ */
+private[kafka010] class KafkaTransactionDataWriter(
+ targetTopic: Option[String],
+ producerParams: ju.Map[String, Object],
+ inputSchema: Seq[Attribute])
+ extends KafkaRowWriter(inputSchema, targetTopic) with
DataWriter[InternalRow] {
+
+ private lazy val producer = {
+ val kafkaProducer = CachedKafkaProducer.getOrCreate(producerParams)
Review comment:
I have considered kafka producer per executor. But there will be data loss
to abort transaction when multiple task share one transaction, and some task
failed and retry in other executor.
So to avoid create too many producer, task will reuse the created producer.
And the config `producer.create.factor` will limit producer total number in
abnormal scene, such as long tail task.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]