[ https://issues.apache.org/jira/browse/KAFKA-19012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17939739#comment-17939739 ]
Donny Nadolny commented on KAFKA-19012: --------------------------------------- Here is the code for the custom partitioner we use. An explanation of what it does: Normally when you repartition a topic, there is a race condition that results in what is effectively data loss if you have consumers that are using offset reset = latest. Say you go from 4 to 8 partitions, and the producer sees the new partitions and publishes to them before the consumer group has rebalanced and begun consuming from the new partitions. Shortly after that, the consumer group rebalances and begins consuming but because it uses offset reset = latest it will skip over those messages that were already published. To avoid that (and for some other reasons too) we have this code which comes in to effect when we're in the process of repartitioning a topic. First we create the new partitions (in the example above readPartitions would go from 4 to 8, but writePartitions would remain 4) and we allow consumers to begin consuming and that's when this code takes effect, even though there are 8 partitions for the purposes of producing we'll only publish to the original 4 partitions. Once all consumers have rebalanced, we can update writePartitions to 8 and allow traffic to flow through to all partitions. Many (maybe all?) of these misroutings have occurred when we are not in the process of repartitioning a topic, i.e. readPartitions == writePartitions for both the original and misrouted topic so {{effectiveCluster}} doesn't change the cluster at all. {code:java} package com.stripe.kafkatools.kproxy.partitioner import java.util.concurrent.atomic.AtomicReference import java.util.{Map => JMap} import com.stripe.kafkatools.config.{ClusterName, Reloading} import com.stripe.kafkatools.kproxy.config.KproxyConfig import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.clients.producer.internals.DefaultPartitioner import org.apache.kafka.common.Cluster import org.slf4j.LoggerFactory import scala.collection.JavaConverters._ import scala.collection._ import scala.reflect.ClassTag object KproxyPartitioner { private val ClusterName = "CLUSTER_NAME" private val ReloadingConfig = "RELOADING_KAFKA_CONFIG" def config(cluster: ClusterName, kafkaConfig: Reloading[KproxyConfig]): Map[String, Any] = Map( KproxyPartitioner.ClusterName -> cluster.name, KproxyPartitioner.ReloadingConfig -> kafkaConfig, ProducerConfig.PARTITIONER_CLASS_CONFIG -> classOf[KproxyPartitioner].getName ) // only for testing def apply(cluster: ClusterName, config: Reloading[KproxyConfig]): KproxyPartitioner = { val partitioner = new KproxyPartitioner() val partitionerConfig = KproxyPartitioner.config(cluster, config) partitioner.configure(partitionerConfig.asJava) partitioner } } /** * For records with unspecified partition writes it to partitions in range [0, `partitions`) * where `partitions` is read from `KproxyConfig`. This ensures no data loss at consumer during topic repartition. * * * Partitioning Strategy: * <ul> * <li> If a partition is specified in a record, use it * <li> If no partition but a key is present choose a partition in range [0, `partitions`) based on a has of the key * <li> If no partition or key is present choose the sticky partition that changes when the batch is full. *</ul> */ class KproxyPartitioner extends DefaultPartitioner { private[this] val logger = LoggerFactory.getLogger(getClass) // 2.15. SHOULD NOT use "var" as shared state private val kafkaConfigRef = new AtomicReference[Reloading[KproxyConfig]]() private val kafkaClusterNameRef = new AtomicReference[ClusterName]() private def effectiveCluster(topic: String, cluster: Cluster): Cluster = { val writePartitions = kafkaConfigRef .get() .current() .topics(topic) .partitions(kafkaClusterNameRef.get()) .writable val effectiveCluster = if (writePartitions < cluster.partitionCountForTopic(topic)) { val updatedPartitions = cluster.partitionsForTopic(topic).asScala.filter(_.partition() < writePartitions) new Cluster( cluster.clusterResource.clusterId, cluster.nodes, updatedPartitions.asJavaCollection, cluster.unauthorizedTopics, cluster.invalidTopics, cluster.internalTopics, cluster.controller ) } else cluster effectiveCluster } /** * Compute the partition for the given record. * * @param topic The topic name * @param key The key to partition on (or null if no key) * @param keyBytes serialized key to partition on (or null if no key) * @param value The value to partition on or null * @param valueBytes serialized value to partition on or null * @param cluster The current cluster metadata */ override def partition( topic: String, key: Any, keyBytes: Array[Byte], value: Any, valueBytes: Array[Byte], cluster: Cluster ): Int = { val newCluster = effectiveCluster(topic, cluster) super.partition(topic, key, keyBytes, value, valueBytes, newCluster) } override def configure(configs: JMap[String, _]): Unit = { def get[A: ClassTag](key: String): A = configs.asScala.get(key) match { case Some(value: A) => value case _ => sys.error(s"No value for $key") } kafkaConfigRef.set(get[Reloading[KproxyConfig]](KproxyPartitioner.ReloadingConfig)) kafkaClusterNameRef.set(ClusterName(get[String](KproxyPartitioner.ClusterName))) logger.info(s"Configs passed to KproxyPartitioner: ${kafkaConfigRef.get()}") } /** * Applicable in case of batched writing without a specified key * @param topic * @param cluster * @param prevPartition */ override def onNewBatch(topic: String, cluster: Cluster, prevPartition: Int): Unit = { val newCluster = effectiveCluster(topic, cluster) super.onNewBatch(topic, newCluster, prevPartition) () } } {code} Here's the interceptor class which is very rarely used, it's pretty self explanatory: {code:java} package com.stripe.kafkatools.clients import github.gphat.censorinus.DogStatsDClient import org.apache.kafka.clients.producer.{ProducerInterceptor, ProducerRecord, RecordMetadata} import java.util.{Map => JMap} class ProducerMetricsInterceptor[K, V] extends ProducerInterceptor[K, V] { var metricsTags: List[String] = Nil private val successful = "success:true" private val unsuccessful = "success:false" private val stats: DogStatsDClient = statsClient private val totalMetric = "produce.interceptor.total" override def configure(configs: JMap[String, _]): Unit = { val clientId = produceClientId(configs).getOrElse("no-client-id") metricsTags = s"client_id:$clientId" :: Nil } override def onSend(record: ProducerRecord[K, V]): ProducerRecord[K, V] = { stats.increment(totalMetric, tags = metricsTags) try { record.headers.add( TimestampHeaders.Key, TimestampHeaders.fromDouble(TimestampHeaders.getCurrentTimestamp()) ) } catch { case _: IllegalStateException => // Records may be in a read-only state if we've already attempted sending once // If we don't catch this, it'll log a failure (but not fail the send) () } record } override def onAcknowledgement(metadata: RecordMetadata, exception: Exception): Unit = if (exception == null) stats.increment(totalMetric, tags = successful :: metricsTags) else stats.increment(totalMetric, tags = unsuccessful :: metricsTags) override def close(): Unit = () } {code} > Messages ending up on the wrong topic > ------------------------------------- > > Key: KAFKA-19012 > URL: https://issues.apache.org/jira/browse/KAFKA-19012 > Project: Kafka > Issue Type: Bug > Components: clients, producer > Affects Versions: 3.2.3, 3.8.1 > Reporter: Donny Nadolny > Assignee: Kirk True > Priority: Major > > We're experiencing messages very occasionally ending up on a different topic > than what they were published to. That is, we publish a message to topicA and > consumers of topicB see it and fail to parse it because the message contents > are meant for topicA. This has happened for various topics. > We've begun adding a header with the intended topic (which we get just by > reading the topic from the record that we're about to pass to the OSS client) > right before we call producer.send, this header shows the correct topic > (which also matches up with the message contents itself). Similarly we're > able to use this header and compare it to the actual topic to prevent > consuming these misrouted messages, but this is still concerning. > Some details: > - This happens rarely: it happened approximately once per 10 trillion > messages for a few months, though there was a period of a week or so where it > happened more frequently (once per 1 trillion messages or so) > - It often happens in a small burst, eg 2 or 3 messages very close in time > (but from different hosts) will be misrouted > - It often but not always coincides with some sort of event in the cluster > (a broker restarting or being replaced, network issues causing errors, etc). > Also these cluster events happen quite often with no misrouted messages > - We run many clusters, it has happened for several of them > - There is no pattern between intended and actual topic, other than the > intended topic tends to be higher volume ones (but I'd attribute that to > there being more messages published -> more occurrences affecting it rather > than it being more likely per-message) > - It only occurs with clients that are using a non-zero linger > - Once it happened with two sequential messages, both were intended for > topicA but both ended up on topicB, published by the same host (presumably > within the same linger batch) > - Most of our clients are 3.2.3 and it has only affected those, most of our > brokers are 3.2.3 but it has also happened with a cluster that's running > 3.8.1 (but I suspect a client rather than broker problem because of it never > happening with clients that use 0 linger) -- This message was sent by Atlassian Jira (v8.20.10#820010)