[ 
https://issues.apache.org/jira/browse/KAFKA-19012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17939739#comment-17939739
 ] 

Donny Nadolny commented on KAFKA-19012:
---------------------------------------

Here is the code for the custom partitioner we use. An explanation of what it 
does:

Normally when you repartition a topic, there is a race condition that results 
in what is effectively data loss if you have consumers that are using offset 
reset = latest. Say you go from 4 to 8 partitions, and the producer sees the 
new partitions and publishes to them before the consumer group has rebalanced 
and begun consuming from the new partitions. Shortly after that, the consumer 
group rebalances and begins consuming but because it uses offset reset = latest 
it will skip over those messages that were already published. To avoid that 
(and for some other reasons too) we have this code which comes in to effect 
when we're in the process of repartitioning a topic. First we create the new 
partitions (in the example above readPartitions would go from 4 to 8, but 
writePartitions would remain 4) and we allow consumers to begin consuming and 
that's when this code takes effect, even though there are 8 partitions for the 
purposes of producing we'll only publish to the original 4 partitions. Once all 
consumers have rebalanced, we can update writePartitions to 8 and allow traffic 
to flow through to all partitions. Many (maybe all?) of these misroutings have 
occurred when we are not in the process of repartitioning a topic, i.e. 
readPartitions == writePartitions for both the original and misrouted topic so 
{{effectiveCluster}} doesn't change the cluster at all.
{code:java}
package com.stripe.kafkatools.kproxy.partitioner

import java.util.concurrent.atomic.AtomicReference
import java.util.{Map => JMap}
import com.stripe.kafkatools.config.{ClusterName, Reloading}
import com.stripe.kafkatools.kproxy.config.KproxyConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.internals.DefaultPartitioner
import org.apache.kafka.common.Cluster
import org.slf4j.LoggerFactory

import scala.collection.JavaConverters._
import scala.collection._
import scala.reflect.ClassTag

object KproxyPartitioner {
  private val ClusterName = "CLUSTER_NAME"
  private val ReloadingConfig = "RELOADING_KAFKA_CONFIG"
  def config(cluster: ClusterName, kafkaConfig: Reloading[KproxyConfig]): 
Map[String, Any] =
    Map(
      KproxyPartitioner.ClusterName -> cluster.name,
      KproxyPartitioner.ReloadingConfig -> kafkaConfig,
      ProducerConfig.PARTITIONER_CLASS_CONFIG -> 
classOf[KproxyPartitioner].getName
    )

  // only for testing
  def apply(cluster: ClusterName, config: Reloading[KproxyConfig]): 
KproxyPartitioner = {
    val partitioner = new KproxyPartitioner()
    val partitionerConfig = KproxyPartitioner.config(cluster, config)
    partitioner.configure(partitionerConfig.asJava)
    partitioner
  }
}

/**
 * For records with unspecified partition writes it to partitions in range [0, 
`partitions`)
 * where `partitions` is read from `KproxyConfig`. This ensures no data loss at 
consumer during topic repartition.
 *
 *
 * Partitioning Strategy:
 * <ul>
 *   <li> If a partition is specified in a record, use it
 *   <li> If no partition but a key is present choose a partition in range [0, 
`partitions`) based on a has of the key
 *   <li> If no partition or key is present choose the sticky partition that 
changes when the batch is full.
 *</ul>
 */
class KproxyPartitioner extends DefaultPartitioner {
  private[this] val logger = LoggerFactory.getLogger(getClass)
  // 2.15. SHOULD NOT use "var" as shared state
  private val kafkaConfigRef = new AtomicReference[Reloading[KproxyConfig]]()
  private val kafkaClusterNameRef = new AtomicReference[ClusterName]()

  private def effectiveCluster(topic: String, cluster: Cluster): Cluster = {
    val writePartitions = kafkaConfigRef
      .get()
      .current()
      .topics(topic)
      .partitions(kafkaClusterNameRef.get())
      .writable

    val effectiveCluster =
      if (writePartitions < cluster.partitionCountForTopic(topic)) {
        val updatedPartitions =
          cluster.partitionsForTopic(topic).asScala.filter(_.partition() < 
writePartitions)
        new Cluster(
          cluster.clusterResource.clusterId,
          cluster.nodes,
          updatedPartitions.asJavaCollection,
          cluster.unauthorizedTopics,
          cluster.invalidTopics,
          cluster.internalTopics,
          cluster.controller
        )
      } else
        cluster

    effectiveCluster
  }

  /**
   * Compute the partition for the given record.
   *
   * @param topic      The topic name
   * @param key        The key to partition on (or null if no key)
   * @param keyBytes   serialized key to partition on (or null if no key)
   * @param value      The value to partition on or null
   * @param valueBytes serialized value to partition on or null
   * @param cluster    The current cluster metadata
   */
  override def partition(
      topic: String,
      key: Any,
      keyBytes: Array[Byte],
      value: Any,
      valueBytes: Array[Byte],
      cluster: Cluster
  ): Int = {
    val newCluster = effectiveCluster(topic, cluster)
    super.partition(topic, key, keyBytes, value, valueBytes, newCluster)
  }

  override def configure(configs: JMap[String, _]): Unit = {

    def get[A: ClassTag](key: String): A =
      configs.asScala.get(key) match {
        case Some(value: A) => value
        case _              => sys.error(s"No value for $key")
      }

    
kafkaConfigRef.set(get[Reloading[KproxyConfig]](KproxyPartitioner.ReloadingConfig))
    
kafkaClusterNameRef.set(ClusterName(get[String](KproxyPartitioner.ClusterName)))
    logger.info(s"Configs passed to KproxyPartitioner: ${kafkaConfigRef.get()}")
  }

  /**
   * Applicable in case of batched writing without a specified key
   * @param topic
   * @param cluster
   * @param prevPartition
   */
  override def onNewBatch(topic: String, cluster: Cluster, prevPartition: Int): 
Unit = {
    val newCluster = effectiveCluster(topic, cluster)
    super.onNewBatch(topic, newCluster, prevPartition)
    ()
  }
} {code}
Here's the interceptor class which is very rarely used, it's pretty self 
explanatory:
{code:java}
package com.stripe.kafkatools.clients

import github.gphat.censorinus.DogStatsDClient
import org.apache.kafka.clients.producer.{ProducerInterceptor, ProducerRecord, 
RecordMetadata}
import java.util.{Map => JMap}

class ProducerMetricsInterceptor[K, V] extends ProducerInterceptor[K, V] {
  var metricsTags: List[String] = Nil
  private val successful = "success:true"
  private val unsuccessful = "success:false"

  private val stats: DogStatsDClient = statsClient
  private val totalMetric = "produce.interceptor.total"

  override def configure(configs: JMap[String, _]): Unit = {
    val clientId = produceClientId(configs).getOrElse("no-client-id")
    metricsTags = s"client_id:$clientId" :: Nil
  }

  override def onSend(record: ProducerRecord[K, V]): ProducerRecord[K, V] = {
    stats.increment(totalMetric, tags = metricsTags)
    try {
      record.headers.add(
        TimestampHeaders.Key,
        TimestampHeaders.fromDouble(TimestampHeaders.getCurrentTimestamp())
      )
    } catch {
      case _: IllegalStateException =>
        // Records may be in a read-only state if we've already attempted 
sending once
        // If we don't catch this, it'll log a failure (but not fail the send)
        ()
    }
    record
  }

  override def onAcknowledgement(metadata: RecordMetadata, exception: 
Exception): Unit =
    if (exception == null)
      stats.increment(totalMetric, tags = successful :: metricsTags)
    else
      stats.increment(totalMetric, tags = unsuccessful :: metricsTags)

  override def close(): Unit = ()
} {code}

> Messages ending up on the wrong topic
> -------------------------------------
>
>                 Key: KAFKA-19012
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19012
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, producer 
>    Affects Versions: 3.2.3, 3.8.1
>            Reporter: Donny Nadolny
>            Assignee: Kirk True
>            Priority: Major
>
> We're experiencing messages very occasionally ending up on a different topic 
> than what they were published to. That is, we publish a message to topicA and 
> consumers of topicB see it and fail to parse it because the message contents 
> are meant for topicA. This has happened for various topics. 
> We've begun adding a header with the intended topic (which we get just by 
> reading the topic from the record that we're about to pass to the OSS client) 
> right before we call producer.send, this header shows the correct topic 
> (which also matches up with the message contents itself). Similarly we're 
> able to use this header and compare it to the actual topic to prevent 
> consuming these misrouted messages, but this is still concerning.
> Some details:
>  - This happens rarely: it happened approximately once per 10 trillion 
> messages for a few months, though there was a period of a week or so where it 
> happened more frequently (once per 1 trillion messages or so)
>  - It often happens in a small burst, eg 2 or 3 messages very close in time 
> (but from different hosts) will be misrouted
>  - It often but not always coincides with some sort of event in the cluster 
> (a broker restarting or being replaced, network issues causing errors, etc). 
> Also these cluster events happen quite often with no misrouted messages
>  - We run many clusters, it has happened for several of them
>  - There is no pattern between intended and actual topic, other than the 
> intended topic tends to be higher volume ones (but I'd attribute that to 
> there being more messages published -> more occurrences affecting it rather 
> than it being more likely per-message)
>  - It only occurs with clients that are using a non-zero linger
>  - Once it happened with two sequential messages, both were intended for 
> topicA but both ended up on topicB, published by the same host (presumably 
> within the same linger batch)
>  - Most of our clients are 3.2.3 and it has only affected those, most of our 
> brokers are 3.2.3 but it has also happened with a cluster that's running 
> 3.8.1 (but I suspect a client rather than broker problem because of it never 
> happening with clients that use 0 linger)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to