[GitHub] [spark] wenxuanguan commented on a change in pull request #25618: [SPARK-28908][SS]Implement Kafka EOS sink for Structured Streaming

2019-08-29 Thread GitBox
wenxuanguan commented on a change in pull request #25618: 
[SPARK-28908][SS]Implement Kafka EOS sink for Structured Streaming
URL: https://github.com/apache/spark/pull/25618#discussion_r319342797
 
 

 ##
 File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataWriter.scala
 ##
 @@ -18,16 +18,160 @@
 package org.apache.spark.sql.kafka010
 
 import java.{util => ju}
+import java.util.concurrent.atomic.AtomicInteger
+
+import com.google.common.cache._
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.util.Utils
+
+/**
+ * A [[WriterCommitMessage]] for Kafka commit message.
+ * @param transactionalId Unique transactionalId for each producer.
+ * @param epoch Transactional epoch.
+ * @param producerId Transactional producerId for producer, got when init 
transaction.
+ */
+private[kafka010] case class ProducerTransactionMetaData(
+transactionalId: String,
+epoch: Short,
+producerId: Long)
+  extends WriterCommitMessage
+
+/**
+ * Emtpy commit message for resume transaction.
+ */
+private case object EmptyCommitMessage extends WriterCommitMessage
+
+private[kafka010] case object ProducerTransactionMetaData {
+  val VERSION = 1
+
+  def toTransactionId(
+  executorId: String,
+  taskIndex: String,
+  transactionalIdSuffix: String): String = {
+toTransactionId(toProducerIdentity(executorId, taskIndex), 
transactionalIdSuffix)
+  }
+
+  def toTransactionId(producerIdentity: String, transactionalIdSuffix: 
String): String = {
+s"$producerIdentity||$transactionalIdSuffix"
+  }
+
+  def toTransactionalIdSuffix(transactionalId: String): String = {
+transactionalId.split("\\|\\|", 2)(1)
+  }
+
+  def toProducerIdentity(transactionalId: String): String = {
+transactionalId.split("\\|\\|", 2)(0)
+  }
+
+  def toExecutorId(transactionalId: String): String = {
+val producerIdentity = toProducerIdentity(transactionalId)
+producerIdentity.split("-", 2)(0)
+  }
+
+  def toTaskIndex(transactionalId: String): String = {
+val producerIdentity = toProducerIdentity(transactionalId)
+producerIdentity.split("-", 2)(1)
+  }
+
+  def toProducerIdentity(executorId: String, taskIndex: String): String = {
+s"$executorId-$taskIndex"
+  }
+}
+
+/**
+ * A [[DataWriter]] for Kafka transactional writing. One data writer will be 
created
+ * in each partition to process incoming rows.
+ *
+ * @param targetTopic The topic that this data writer is targeting. If None, 
topic will be inferred
+ *from a `topic` field in the incoming data.
+ * @param producerParams Parameters to use for the Kafka producer.
+ * @param inputSchema The attributes in the input data.
+ */
+private[kafka010] class KafkaTransactionDataWriter(
+targetTopic: Option[String],
+producerParams: ju.Map[String, Object],
+inputSchema: Seq[Attribute])
+  extends KafkaRowWriter(inputSchema, targetTopic) with 
DataWriter[InternalRow] {
+
+  private lazy val producer = {
+val kafkaProducer = CachedKafkaProducer.getOrCreate(producerParams)
 
 Review comment:
   I think caching logic is ok and we can control producer creation per task, 
and also failover with transactional.id in producerParams.
   Transaction producer is not thread safe, so what I do is one producer per 
task in one micro-batch, and in next batch reused the created producer instead 
of recreate one since transaction is complete in every micro-batch.  With 
producerParams, transactional.id is different between tasks in one micro-batch, 
but same in the next micro-batch.
   And if task number is same for every executor in every micro-batch, no more 
producer will be created except the first micro-batch. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wenxuanguan commented on a change in pull request #25618: [SPARK-28908][SS]Implement Kafka EOS sink for Structured Streaming

2019-08-29 Thread GitBox
wenxuanguan commented on a change in pull request #25618: 
[SPARK-28908][SS]Implement Kafka EOS sink for Structured Streaming
URL: https://github.com/apache/spark/pull/25618#discussion_r319332869
 
 

 ##
 File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataWriter.scala
 ##
 @@ -18,16 +18,160 @@
 package org.apache.spark.sql.kafka010
 
 import java.{util => ju}
+import java.util.concurrent.atomic.AtomicInteger
+
+import com.google.common.cache._
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.util.Utils
+
+/**
+ * A [[WriterCommitMessage]] for Kafka commit message.
+ * @param transactionalId Unique transactionalId for each producer.
+ * @param epoch Transactional epoch.
+ * @param producerId Transactional producerId for producer, got when init 
transaction.
+ */
+private[kafka010] case class ProducerTransactionMetaData(
+transactionalId: String,
+epoch: Short,
+producerId: Long)
+  extends WriterCommitMessage
+
+/**
+ * Emtpy commit message for resume transaction.
+ */
+private case object EmptyCommitMessage extends WriterCommitMessage
+
+private[kafka010] case object ProducerTransactionMetaData {
+  val VERSION = 1
+
+  def toTransactionId(
+  executorId: String,
+  taskIndex: String,
+  transactionalIdSuffix: String): String = {
+toTransactionId(toProducerIdentity(executorId, taskIndex), 
transactionalIdSuffix)
+  }
+
+  def toTransactionId(producerIdentity: String, transactionalIdSuffix: 
String): String = {
+s"$producerIdentity||$transactionalIdSuffix"
+  }
+
+  def toTransactionalIdSuffix(transactionalId: String): String = {
+transactionalId.split("\\|\\|", 2)(1)
+  }
+
+  def toProducerIdentity(transactionalId: String): String = {
+transactionalId.split("\\|\\|", 2)(0)
+  }
+
+  def toExecutorId(transactionalId: String): String = {
+val producerIdentity = toProducerIdentity(transactionalId)
+producerIdentity.split("-", 2)(0)
+  }
+
+  def toTaskIndex(transactionalId: String): String = {
+val producerIdentity = toProducerIdentity(transactionalId)
+producerIdentity.split("-", 2)(1)
+  }
+
+  def toProducerIdentity(executorId: String, taskIndex: String): String = {
+s"$executorId-$taskIndex"
+  }
+}
+
+/**
+ * A [[DataWriter]] for Kafka transactional writing. One data writer will be 
created
+ * in each partition to process incoming rows.
+ *
+ * @param targetTopic The topic that this data writer is targeting. If None, 
topic will be inferred
+ *from a `topic` field in the incoming data.
+ * @param producerParams Parameters to use for the Kafka producer.
+ * @param inputSchema The attributes in the input data.
+ */
+private[kafka010] class KafkaTransactionDataWriter(
+targetTopic: Option[String],
+producerParams: ju.Map[String, Object],
+inputSchema: Seq[Attribute])
+  extends KafkaRowWriter(inputSchema, targetTopic) with 
DataWriter[InternalRow] {
+
+  private lazy val producer = {
+val kafkaProducer = CachedKafkaProducer.getOrCreate(producerParams)
 
 Review comment:
   I have considered kafka producer per executor. But there will be data loss 
to abort transaction when multiple task share one transaction, and some task 
failed and retry in other executor.
   So to avoid create too many producer, task will reuse the created producer. 
And the config `producer.create.factor` will limit producer total number in 
abnormal scene, such as long tail task.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org