[GitHub] [spark] wenxuanguan commented on a change in pull request #25618: [SPARK-28908][SS]Implement Kafka EOS sink for Structured Streaming
wenxuanguan commented on a change in pull request #25618: [SPARK-28908][SS]Implement Kafka EOS sink for Structured Streaming URL: https://github.com/apache/spark/pull/25618#discussion_r319342797 ## File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataWriter.scala ## @@ -18,16 +18,160 @@ package org.apache.spark.sql.kafka010 import java.{util => ju} +import java.util.concurrent.atomic.AtomicInteger + +import com.google.common.cache._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.util.Utils + +/** + * A [[WriterCommitMessage]] for Kafka commit message. + * @param transactionalId Unique transactionalId for each producer. + * @param epoch Transactional epoch. + * @param producerId Transactional producerId for producer, got when init transaction. + */ +private[kafka010] case class ProducerTransactionMetaData( +transactionalId: String, +epoch: Short, +producerId: Long) + extends WriterCommitMessage + +/** + * Emtpy commit message for resume transaction. + */ +private case object EmptyCommitMessage extends WriterCommitMessage + +private[kafka010] case object ProducerTransactionMetaData { + val VERSION = 1 + + def toTransactionId( + executorId: String, + taskIndex: String, + transactionalIdSuffix: String): String = { +toTransactionId(toProducerIdentity(executorId, taskIndex), transactionalIdSuffix) + } + + def toTransactionId(producerIdentity: String, transactionalIdSuffix: String): String = { +s"$producerIdentity||$transactionalIdSuffix" + } + + def toTransactionalIdSuffix(transactionalId: String): String = { +transactionalId.split("\\|\\|", 2)(1) + } + + def toProducerIdentity(transactionalId: String): String = { +transactionalId.split("\\|\\|", 2)(0) + } + + def toExecutorId(transactionalId: String): String = { +val producerIdentity = toProducerIdentity(transactionalId) +producerIdentity.split("-", 2)(0) + } + + def toTaskIndex(transactionalId: String): String = { +val producerIdentity = toProducerIdentity(transactionalId) +producerIdentity.split("-", 2)(1) + } + + def toProducerIdentity(executorId: String, taskIndex: String): String = { +s"$executorId-$taskIndex" + } +} + +/** + * A [[DataWriter]] for Kafka transactional writing. One data writer will be created + * in each partition to process incoming rows. + * + * @param targetTopic The topic that this data writer is targeting. If None, topic will be inferred + *from a `topic` field in the incoming data. + * @param producerParams Parameters to use for the Kafka producer. + * @param inputSchema The attributes in the input data. + */ +private[kafka010] class KafkaTransactionDataWriter( +targetTopic: Option[String], +producerParams: ju.Map[String, Object], +inputSchema: Seq[Attribute]) + extends KafkaRowWriter(inputSchema, targetTopic) with DataWriter[InternalRow] { + + private lazy val producer = { +val kafkaProducer = CachedKafkaProducer.getOrCreate(producerParams) Review comment: I think caching logic is ok and we can control producer creation per task, and also failover with transactional.id in producerParams. Transaction producer is not thread safe, so what I do is one producer per task in one micro-batch, and in next batch reused the created producer instead of recreate one since transaction is complete in every micro-batch. With producerParams, transactional.id is different between tasks in one micro-batch, but same in the next micro-batch. And if task number is same for every executor in every micro-batch, no more producer will be created except the first micro-batch. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wenxuanguan commented on a change in pull request #25618: [SPARK-28908][SS]Implement Kafka EOS sink for Structured Streaming
wenxuanguan commented on a change in pull request #25618: [SPARK-28908][SS]Implement Kafka EOS sink for Structured Streaming URL: https://github.com/apache/spark/pull/25618#discussion_r319332869 ## File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataWriter.scala ## @@ -18,16 +18,160 @@ package org.apache.spark.sql.kafka010 import java.{util => ju} +import java.util.concurrent.atomic.AtomicInteger + +import com.google.common.cache._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.util.Utils + +/** + * A [[WriterCommitMessage]] for Kafka commit message. + * @param transactionalId Unique transactionalId for each producer. + * @param epoch Transactional epoch. + * @param producerId Transactional producerId for producer, got when init transaction. + */ +private[kafka010] case class ProducerTransactionMetaData( +transactionalId: String, +epoch: Short, +producerId: Long) + extends WriterCommitMessage + +/** + * Emtpy commit message for resume transaction. + */ +private case object EmptyCommitMessage extends WriterCommitMessage + +private[kafka010] case object ProducerTransactionMetaData { + val VERSION = 1 + + def toTransactionId( + executorId: String, + taskIndex: String, + transactionalIdSuffix: String): String = { +toTransactionId(toProducerIdentity(executorId, taskIndex), transactionalIdSuffix) + } + + def toTransactionId(producerIdentity: String, transactionalIdSuffix: String): String = { +s"$producerIdentity||$transactionalIdSuffix" + } + + def toTransactionalIdSuffix(transactionalId: String): String = { +transactionalId.split("\\|\\|", 2)(1) + } + + def toProducerIdentity(transactionalId: String): String = { +transactionalId.split("\\|\\|", 2)(0) + } + + def toExecutorId(transactionalId: String): String = { +val producerIdentity = toProducerIdentity(transactionalId) +producerIdentity.split("-", 2)(0) + } + + def toTaskIndex(transactionalId: String): String = { +val producerIdentity = toProducerIdentity(transactionalId) +producerIdentity.split("-", 2)(1) + } + + def toProducerIdentity(executorId: String, taskIndex: String): String = { +s"$executorId-$taskIndex" + } +} + +/** + * A [[DataWriter]] for Kafka transactional writing. One data writer will be created + * in each partition to process incoming rows. + * + * @param targetTopic The topic that this data writer is targeting. If None, topic will be inferred + *from a `topic` field in the incoming data. + * @param producerParams Parameters to use for the Kafka producer. + * @param inputSchema The attributes in the input data. + */ +private[kafka010] class KafkaTransactionDataWriter( +targetTopic: Option[String], +producerParams: ju.Map[String, Object], +inputSchema: Seq[Attribute]) + extends KafkaRowWriter(inputSchema, targetTopic) with DataWriter[InternalRow] { + + private lazy val producer = { +val kafkaProducer = CachedKafkaProducer.getOrCreate(producerParams) Review comment: I have considered kafka producer per executor. But there will be data loss to abort transaction when multiple task share one transaction, and some task failed and retry in other executor. So to avoid create too many producer, task will reuse the created producer. And the config `producer.create.factor` will limit producer total number in abnormal scene, such as long tail task. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org