Re: Broker Exceptions
Yes, Jiangjie, I do see lots of these errors Starting preferred replica leader election for partitions” in logs. I also see lot of Produce request failure warnings in with the NotLeader Exception. I tried switching off the auto.leader.relabalance to false. I am still noticing the rebalance happening. My understanding was the rebalance will not happen when this is set to false. Thanks Zakee On Feb 25, 2015, at 5:17 PM, Jiangjie Qin j...@linkedin.com.INVALID wrote: I don’t think num.replica.fetchers will help in this case. Increasing number of fetcher threads will only help in cases where you have a large amount of data coming into a broker and more replica fetcher threads will help keep up. We usually only use 1-2 for each broker. But in your case, it looks that leader migration cause issue. Do you see anything else in the log? Like preferred leader election? Jiangjie (Becket) Qin On 2/25/15, 5:02 PM, Zakee kzak...@netzero.net mailto:kzak...@netzero.net wrote: Thanks, Jiangjie. Yes, I do see under partitions usually shooting every hour. Anythings that I could try to reduce it? How does num.replica.fetchers affect the replica sync? Currently have configured 7 each of 5 brokers. -Zakee On Wed, Feb 25, 2015 at 4:17 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: These messages are usually caused by leader migration. I think as long as you don¹t see this lasting for ever and got a bunch of under replicated partitions, it should be fine. Jiangjie (Becket) Qin On 2/25/15, 4:07 PM, Zakee kzak...@netzero.net wrote: Need to know if I should I be worried about this or ignore them. I see tons of these exceptions/warnings in the broker logs, not sure what causes them and what could be done to fix them. ERROR [ReplicaFetcherThread-3-5], Error for partition [TestTopic] to broker 5:class kafka.common.NotLeaderForPartitionException (kafka.server.ReplicaFetcherThread) [2015-02-25 11:01:41,785] ERROR [ReplicaFetcherThread-3-5], Error for partition [TestTopic] to broker 5:class kafka.common.NotLeaderForPartitionException (kafka.server.ReplicaFetcherThread) [2015-02-25 11:01:41,785] WARN [Replica Manager on Broker 2]: Fetch request with correlation id 950084 from client ReplicaFetcherThread-1-2 on partition [TestTopic,2] failed due to Leader not local for partition [TestTopic,2] on broker 2 (kafka.server.ReplicaManager) Any ideas? -Zakee Next Apple Sensation 1 little-known path to big profits http://thirdpartyoffers.netzero.net/TGL3231/54ee63b9e704b63b94061st03vuc Extended Stay America Get Fantastic Amenities, low rates! Kitchen, Ample Workspace, Free WIFI http://thirdpartyoffers.netzero.net/TGL3255/54ee66f26da6f66f10ad4mp02duc Extended Stay America Official Site. Free WIFI, Kitchens. Our best rates here, guaranteed. http://thirdpartyoffers.netzero.net/TGL3255/54ee80744cfa7747461mp13duc http://thirdpartyoffers.netzero.net/TGL3255/54ee80744cfa7747461mp13duc
Re: Broker Exceptions
Yes, the rebalance should not happen in that case. That is a little bit strange. Could you try to launch a clean Kafka cluster with auto.leader.election disabled and try push data? When leader migration occurs, NotLeaderForPartition exception is expected. Jiangjie (Becket) Qin On 3/6/15, 3:14 PM, Zakee kzak...@netzero.net wrote: Yes, Jiangjie, I do see lots of these errors Starting preferred replica leader election for partitions” in logs. I also see lot of Produce request failure warnings in with the NotLeader Exception. I tried switching off the auto.leader.relabalance to false. I am still noticing the rebalance happening. My understanding was the rebalance will not happen when this is set to false. Thanks Zakee On Feb 25, 2015, at 5:17 PM, Jiangjie Qin j...@linkedin.com.INVALID wrote: I don’t think num.replica.fetchers will help in this case. Increasing number of fetcher threads will only help in cases where you have a large amount of data coming into a broker and more replica fetcher threads will help keep up. We usually only use 1-2 for each broker. But in your case, it looks that leader migration cause issue. Do you see anything else in the log? Like preferred leader election? Jiangjie (Becket) Qin On 2/25/15, 5:02 PM, Zakee kzak...@netzero.net mailto:kzak...@netzero.net wrote: Thanks, Jiangjie. Yes, I do see under partitions usually shooting every hour. Anythings that I could try to reduce it? How does num.replica.fetchers affect the replica sync? Currently have configured 7 each of 5 brokers. -Zakee On Wed, Feb 25, 2015 at 4:17 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: These messages are usually caused by leader migration. I think as long as you don¹t see this lasting for ever and got a bunch of under replicated partitions, it should be fine. Jiangjie (Becket) Qin On 2/25/15, 4:07 PM, Zakee kzak...@netzero.net wrote: Need to know if I should I be worried about this or ignore them. I see tons of these exceptions/warnings in the broker logs, not sure what causes them and what could be done to fix them. ERROR [ReplicaFetcherThread-3-5], Error for partition [TestTopic] to broker 5:class kafka.common.NotLeaderForPartitionException (kafka.server.ReplicaFetcherThread) [2015-02-25 11:01:41,785] ERROR [ReplicaFetcherThread-3-5], Error for partition [TestTopic] to broker 5:class kafka.common.NotLeaderForPartitionException (kafka.server.ReplicaFetcherThread) [2015-02-25 11:01:41,785] WARN [Replica Manager on Broker 2]: Fetch request with correlation id 950084 from client ReplicaFetcherThread-1-2 on partition [TestTopic,2] failed due to Leader not local for partition [TestTopic,2] on broker 2 (kafka.server.ReplicaManager) Any ideas? -Zakee Next Apple Sensation 1 little-known path to big profits http://thirdpartyoffers.netzero.net/TGL3231/54ee63b9e704b63b94061st03v uc Extended Stay America Get Fantastic Amenities, low rates! Kitchen, Ample Workspace, Free WIFI http://thirdpartyoffers.netzero.net/TGL3255/54ee66f26da6f66f10ad4mp02du c Extended Stay America Official Site. Free WIFI, Kitchens. Our best rates here, guaranteed. http://thirdpartyoffers.netzero.net/TGL3255/54ee80744cfa7747461mp13duc http://thirdpartyoffers.netzero.net/TGL3255/54ee80744cfa7747461mp13duc
Kafka to Hadoop HDFS
Hi Kafka masters, Wondering if any open source solutions, to transfer message received from Kakfa to Hadoop HDFS? Thanks. regards, Lin
Re: Broker Exceptions
Thanks, Jiangjie, I will try with a clean cluster again. Thanks Zakee On Mar 6, 2015, at 3:51 PM, Jiangjie Qin j...@linkedin.com.INVALID wrote: Yes, the rebalance should not happen in that case. That is a little bit strange. Could you try to launch a clean Kafka cluster with auto.leader.election disabled and try push data? When leader migration occurs, NotLeaderForPartition exception is expected. Jiangjie (Becket) Qin On 3/6/15, 3:14 PM, Zakee kzak...@netzero.net wrote: Yes, Jiangjie, I do see lots of these errors Starting preferred replica leader election for partitions” in logs. I also see lot of Produce request failure warnings in with the NotLeader Exception. I tried switching off the auto.leader.relabalance to false. I am still noticing the rebalance happening. My understanding was the rebalance will not happen when this is set to false. Thanks Zakee On Feb 25, 2015, at 5:17 PM, Jiangjie Qin j...@linkedin.com.INVALID wrote: I don’t think num.replica.fetchers will help in this case. Increasing number of fetcher threads will only help in cases where you have a large amount of data coming into a broker and more replica fetcher threads will help keep up. We usually only use 1-2 for each broker. But in your case, it looks that leader migration cause issue. Do you see anything else in the log? Like preferred leader election? Jiangjie (Becket) Qin On 2/25/15, 5:02 PM, Zakee kzak...@netzero.net mailto:kzak...@netzero.net wrote: Thanks, Jiangjie. Yes, I do see under partitions usually shooting every hour. Anythings that I could try to reduce it? How does num.replica.fetchers affect the replica sync? Currently have configured 7 each of 5 brokers. -Zakee On Wed, Feb 25, 2015 at 4:17 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: These messages are usually caused by leader migration. I think as long as you don¹t see this lasting for ever and got a bunch of under replicated partitions, it should be fine. Jiangjie (Becket) Qin On 2/25/15, 4:07 PM, Zakee kzak...@netzero.net wrote: Need to know if I should I be worried about this or ignore them. I see tons of these exceptions/warnings in the broker logs, not sure what causes them and what could be done to fix them. ERROR [ReplicaFetcherThread-3-5], Error for partition [TestTopic] to broker 5:class kafka.common.NotLeaderForPartitionException (kafka.server.ReplicaFetcherThread) [2015-02-25 11:01:41,785] ERROR [ReplicaFetcherThread-3-5], Error for partition [TestTopic] to broker 5:class kafka.common.NotLeaderForPartitionException (kafka.server.ReplicaFetcherThread) [2015-02-25 11:01:41,785] WARN [Replica Manager on Broker 2]: Fetch request with correlation id 950084 from client ReplicaFetcherThread-1-2 on partition [TestTopic,2] failed due to Leader not local for partition [TestTopic,2] on broker 2 (kafka.server.ReplicaManager) Any ideas? -Zakee Next Apple Sensation 1 little-known path to big profits http://thirdpartyoffers.netzero.net/TGL3231/54ee63b9e704b63b94061st03v uc Extended Stay America Get Fantastic Amenities, low rates! Kitchen, Ample Workspace, Free WIFI http://thirdpartyoffers.netzero.net/TGL3255/54ee66f26da6f66f10ad4mp02du c Extended Stay America Official Site. Free WIFI, Kitchens. Our best rates here, guaranteed. http://thirdpartyoffers.netzero.net/TGL3255/54ee80744cfa7747461mp13duc http://thirdpartyoffers.netzero.net/TGL3255/54ee80744cfa7747461mp13duc The WORST exercise for aging Avoid this #34;healthy#34; exercise to look feel 5-10 years YOUNGER http://thirdpartyoffers.netzero.net/TGL3255/54fa40e98a0e640e81196mp07duc
Re: Possible to count for unclosed resources in process
Hi, I think you can look at open file descriptors (network connections use FDs). For example: https://apps.sematext.com/spm-reports/s/IoQDvdT0Ig -- all good https://apps.sematext.com/spm-reports/s/v5Hvwta7PP -- Otis restarting 2 consumers lsof probably shows it, too. Otis -- Monitoring * Alerting * Anomaly Detection * Centralized Log Management Solr Elasticsearch Support * http://sematext.com/ On Fri, Mar 6, 2015 at 4:04 PM, Gwen Shapira gshap...@cloudera.com wrote: It doesn't keep track specifically, but there are open sockets that may take a while to clean themselves up. Note that if you use the async producer and don't close the producer nicely, you may miss messages as the connection will close before all messages are sent. Guess how we found out? :) Similar for consumer, if you use high level consumer and don't close the consumer nicely, you may not acknowledge the last messages and they will be re-read next time the consumer starts, leading to duplicates. Gwen On Fri, Mar 6, 2015 at 12:40 PM, Stuart Reynolds s...@stureynolds.com wrote: One of our staff has has been terrible at adding finally clauses to close kafka resources. Does the kafka scala/Java client maintain a count or list of open producers/consumers/client connections?
Re: TopicFilters and 0.9 Consumer
Hi Guozhang, Thanks for confirming. It should be straightforward to make subscribe(TopicFilter) and subscribe(TopicFilter, Partition) work for added/removed topics, since this is mostly regex matching against zookeeper metadata. But any thoughts on how repartitioning would work? (we need to let the consumers know there are more partitions now, so they can spin up more consumers for these partitions). Is there a writeup of sorts somewhere, hinting at what to expect? Thanks Vinoth On Thu, Mar 5, 2015 at 8:56 AM, Guozhang Wang wangg...@gmail.com wrote: Vinoth, Yes we do have plans to continue supporting topic filters in 0.9 consumers, the APIs are not there yet though. Guozhang On Thu, Mar 5, 2015 at 8:32 AM, Vinoth Chandar vin...@uber.com wrote: Hi guys, I was wondering what the plan in 0.9, was for the topic filters that are today in the High level consumer. The new API' http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/org/apache/kafka/clients/consumer/KafkaConsumer.html s subscribe methods, seem to be working with real topic names (which I personally like). Given, the topic filters today support automatic topic discovery (e.g.: MirrorMaker), would there be another mechanism to notify consumers of new topics and change in partitions? (I was thinking about an internal topic that provided these system changes, that consumers can listen to and choose whether or not they want to subscribe to say a newly created topic). Thanks Vinoth -- -- Guozhang
Re: JMS to Kafka: Inbuilt JMSAdaptor/JMSProxy/JMSBridge (Client can speak JMS but hit Kafka)
I think this is great. I assume the form this would take would be a library that implements the JMS api that wraps the existing java producer and consumer? Our past experience has been that trying to maintain all this stuff centrally is too hard and tends to stifle rather than support innovation. So if you are interested in doing this I would recommend doing a small github project. We will definitely help promote it. Several people have asked for it so I suspect you would definitely get some usage. I would also love to hear how well that adaption works in practice--i.e. what percentage of JMS features are supportable by Kafka. -Jay On Thu, Mar 5, 2015 at 6:30 PM, Joshi, Rekha rekha_jo...@intuit.com wrote: Hi, Kafka is a great alternative to JMS, providing high performance, throughput as scalable, distributed pub sub/commit log service. However there always exist traditional systems running on JMS. Rather than rewriting, it would be great if we just had an inbuilt JMSAdaptor/JMSProxy/JMSBridge by which client can speak JMS but hit Kafka behind-the-scene. Something like Chukwa's o.a.h.chukwa.datacollection.adaptor.jms.JMSAdaptor, which receives msg off JMS queue and transforms to a Chukwa chunk? I have come across folks talking of this need in past as well.Is it considered and/or part of the roadmap? http://grokbase.com/t/kafka/users/131cst8xpv/stomp-binding-for-kafka http://grokbase.com/t/kafka/users/148dm4247q/consuming-messages-from-kafka-and-pushing-on-to-a-jms-queue http://grokbase.com/t/kafka/users/143hjepbn2/request-kafka-zookeeper-jms-details Looking for inputs on correct way to approach this so to retain all good features of Kafka while still not rewriting entire application.Possible? Thanks Rekha
Re: How does num.consumer.fetchers get used
Hi Tao, Yes, your understanding is correct. We probably should update the document to make it more clear. Could you open a ticket for it? Jiangjie (Becket) Qin On 3/6/15, 1:23 AM, tao xiao xiaotao...@gmail.com wrote: Hi team, After reading the source code of AbstractFetcherManager I found out that the usage of num.consumer.fetchers may not match what is described in the Kafka doc. My interpretation of the Kafka doc is that the number of fetcher threads is controlled by the value of property num.consumer.fetchers. If I set num.consumer.fetchers=4 there are 4 fetcher threads in total created after consumer is initialized. But what I found from the source code tells me a different thing. Below code is copied from AbstractFetcherManager private def getFetcherId(topic: String, partitionId: Int) : Int = { Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers } def addFetcherForPartitions(partitionAndOffsets: Map[TopicAndPartition, BrokerAndInitialOffset]) { mapLock synchronized { val partitionsPerFetcher = partitionAndOffsets.groupBy{ case(topicAndPartition, brokerAndInitialOffset) = BrokerAndFetcherId(brokerAndInitialOffset.broker, getFetcherId(topicAndPartition.topic, topicAndPartition.partition))} for ((brokerAndFetcherId, partitionAndOffsets) - partitionsPerFetcher) { var fetcherThread: AbstractFetcherThread = null fetcherThreadMap.get(brokerAndFetcherId) match { case Some(f) = fetcherThread = f case None = fetcherThread = createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker) fetcherThreadMap.put(brokerAndFetcherId, fetcherThread) fetcherThread.start } fetcherThreadMap(brokerAndFetcherId).addPartitions(partitionAndOffsets.map { case (topicAndPartition, brokerAndInitOffset) = topicAndPartition - brokerAndInitOffset.initOffset }) } } If I have one topic with one partition and num.consumer.fetchers set to 4 there is actually only one fetcher thread created not 4. num.consumer.fetchers essentially set the max value of number of fetcher threads not the actual number of fetcher threads. The actual number of fetcher threads is controlled by this line of code Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers Is my assumption correct? -- Regards, Tao
How does num.consumer.fetchers get used
Hi team, After reading the source code of AbstractFetcherManager I found out that the usage of num.consumer.fetchers may not match what is described in the Kafka doc. My interpretation of the Kafka doc is that the number of fetcher threads is controlled by the value of property num.consumer.fetchers. If I set num.consumer.fetchers=4 there are 4 fetcher threads in total created after consumer is initialized. But what I found from the source code tells me a different thing. Below code is copied from AbstractFetcherManager private def getFetcherId(topic: String, partitionId: Int) : Int = { Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers } def addFetcherForPartitions(partitionAndOffsets: Map[TopicAndPartition, BrokerAndInitialOffset]) { mapLock synchronized { val partitionsPerFetcher = partitionAndOffsets.groupBy{ case(topicAndPartition, brokerAndInitialOffset) = BrokerAndFetcherId(brokerAndInitialOffset.broker, getFetcherId(topicAndPartition.topic, topicAndPartition.partition))} for ((brokerAndFetcherId, partitionAndOffsets) - partitionsPerFetcher) { var fetcherThread: AbstractFetcherThread = null fetcherThreadMap.get(brokerAndFetcherId) match { case Some(f) = fetcherThread = f case None = fetcherThread = createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker) fetcherThreadMap.put(brokerAndFetcherId, fetcherThread) fetcherThread.start } fetcherThreadMap(brokerAndFetcherId).addPartitions(partitionAndOffsets.map { case (topicAndPartition, brokerAndInitOffset) = topicAndPartition - brokerAndInitOffset.initOffset }) } } If I have one topic with one partition and num.consumer.fetchers set to 4 there is actually only one fetcher thread created not 4. num.consumer.fetchers essentially set the max value of number of fetcher threads not the actual number of fetcher threads. The actual number of fetcher threads is controlled by this line of code Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers Is my assumption correct? -- Regards, Tao
Got java.util.IllegalFormatConversionException when running MirrorMaker off trunk code
Hi team, I am having java.util.IllegalFormatConversionException when running MirrorMaker with log level set to trace. The code is off latest trunk with commit 8f0003f9b694b4da5fbd2f86db872d77a43eb63f The way I bring up is bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config ~/Downloads/kafka/kafka_2.10-0.8.2.0/config/consumer.properties --producer.config ~/Downloads/kafka/kafka_2.10-0.8.2.0/config/producer.properties --num.streams 1 --num.producers 1 --no.data.loss --whitelist mm-benchmark-test\\w* --offset.commit.interval.ms 1 --queue.byte.size 1024 and set the log level to trace in tools-log4j.properties here is the log snippet [2015-03-07 02:04:27,211] TRACE [mirrormaker-producer-0] Sending message with value size 13 (kafka.tools.MirrorMaker$ProducerThread) [2015-03-07 02:04:27,211] TRACE Sending record ProducerRecord(topic=mm-benchmark-test, partition=null, key=[B@130362d0, value=[B@434c4f70 with callback kafka.tools.MirrorMaker$MirrorMakerProducerCallback@46f36494 to topic mm-benchmark-test partition 0 (org.apache.kafka.clients.producer.KafkaProducer) [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending message with value size 13 (kafka.tools.MirrorMaker$ProducerThread) [2015-03-07 02:04:27,212] TRACE Sending record ProducerRecord(topic=mm-benchmark-test, partition=null, key=[B@54957b67, value=[B@21d8d293 with callback kafka.tools.MirrorMaker$MirrorMakerProducerCallback@21e8c241 to topic mm-benchmark-test partition 0 (org.apache.kafka.clients.producer.KafkaProducer) [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending message with value size 13 (kafka.tools.MirrorMaker$ProducerThread) [2015-03-07 02:04:27,212] TRACE Sending record ProducerRecord(topic=mm-benchmark-test, partition=null, key=[B@1eed723b, value=[B@1acd590b with callback kafka.tools.MirrorMaker$MirrorMakerProducerCallback@1f90eeec to topic mm-benchmark-test partition 0 (org.apache.kafka.clients.producer.KafkaProducer) [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending message with value size 13 (kafka.tools.MirrorMaker$ProducerThread) [2015-03-07 02:04:27,212] TRACE Sending record ProducerRecord(topic=mm-benchmark-test, partition=null, key=[B@3ae8a936, value=[B@bd3671 with callback kafka.tools.MirrorMaker$MirrorMakerProducerCallback@6413518 to topic mm-benchmark-test partition 0 (org.apache.kafka.clients.producer.KafkaProducer) [2015-03-07 02:04:27,212] ERROR Error executing user-provided callback on message for topic-partition mm-benchmark-test-0: (org.apache.kafka.clients.producer.internals.RecordBatch) java.util.IllegalFormatConversionException: d != kafka.tools.MirrorMaker$UnackedOffset at java.util.Formatter$FormatSpecifier.failConversion(Formatter.java:4045) at java.util.Formatter$FormatSpecifier.printInteger(Formatter.java:2748) at java.util.Formatter$FormatSpecifier.print(Formatter.java:2702) at java.util.Formatter.format(Formatter.java:2488) at java.util.Formatter.format(Formatter.java:2423) at java.lang.String.format(String.java:2790) at scala.collection.immutable.StringLike$class.format(StringLike.scala:266) at scala.collection.immutable.StringOps.format(StringOps.scala:31) at kafka.tools.MirrorMaker$MirrorMakerProducerCallback$$anonfun$onCompletion$2.apply(MirrorMaker.scala:592) at kafka.tools.MirrorMaker$MirrorMakerProducerCallback$$anonfun$onCompletion$2.apply(MirrorMaker.scala:592) at kafka.utils.Logging$class.trace(Logging.scala:36) at kafka.tools.MirrorMaker$.trace(MirrorMaker.scala:57) at kafka.tools.MirrorMaker$MirrorMakerProducerCallback.onCompletion(MirrorMaker.scala:592) at org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:91) at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:267) at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:235) at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:55) at org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:312) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:225) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:199) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:124) at java.lang.Thread.run(Thread.java:745) [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending message with value size 13 (kafka.tools.MirrorMaker$ProducerThread) -- Regards, Tao
Re: Got java.util.IllegalFormatConversionException when running MirrorMaker off trunk code
A bit more context: I turned on async in producer.properties On Sat, Mar 7, 2015 at 2:09 AM, tao xiao xiaotao...@gmail.com wrote: Hi team, I am having java.util.IllegalFormatConversionException when running MirrorMaker with log level set to trace. The code is off latest trunk with commit 8f0003f9b694b4da5fbd2f86db872d77a43eb63f The way I bring up is bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config ~/Downloads/kafka/kafka_2.10-0.8.2.0/config/consumer.properties --producer.config ~/Downloads/kafka/kafka_2.10-0.8.2.0/config/producer.properties --num.streams 1 --num.producers 1 --no.data.loss --whitelist mm-benchmark-test\\w* --offset.commit.interval.ms 1 --queue.byte.size 1024 and set the log level to trace in tools-log4j.properties here is the log snippet [2015-03-07 02:04:27,211] TRACE [mirrormaker-producer-0] Sending message with value size 13 (kafka.tools.MirrorMaker$ProducerThread) [2015-03-07 02:04:27,211] TRACE Sending record ProducerRecord(topic=mm-benchmark-test, partition=null, key=[B@130362d0, value=[B@434c4f70 with callback kafka.tools.MirrorMaker$MirrorMakerProducerCallback@46f36494 to topic mm-benchmark-test partition 0 (org.apache.kafka.clients.producer.KafkaProducer) [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending message with value size 13 (kafka.tools.MirrorMaker$ProducerThread) [2015-03-07 02:04:27,212] TRACE Sending record ProducerRecord(topic=mm-benchmark-test, partition=null, key=[B@54957b67, value=[B@21d8d293 with callback kafka.tools.MirrorMaker$MirrorMakerProducerCallback@21e8c241 to topic mm-benchmark-test partition 0 (org.apache.kafka.clients.producer.KafkaProducer) [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending message with value size 13 (kafka.tools.MirrorMaker$ProducerThread) [2015-03-07 02:04:27,212] TRACE Sending record ProducerRecord(topic=mm-benchmark-test, partition=null, key=[B@1eed723b, value=[B@1acd590b with callback kafka.tools.MirrorMaker$MirrorMakerProducerCallback@1f90eeec to topic mm-benchmark-test partition 0 (org.apache.kafka.clients.producer.KafkaProducer) [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending message with value size 13 (kafka.tools.MirrorMaker$ProducerThread) [2015-03-07 02:04:27,212] TRACE Sending record ProducerRecord(topic=mm-benchmark-test, partition=null, key=[B@3ae8a936, value=[B@bd3671 with callback kafka.tools.MirrorMaker$MirrorMakerProducerCallback@6413518 to topic mm-benchmark-test partition 0 (org.apache.kafka.clients.producer.KafkaProducer) [2015-03-07 02:04:27,212] ERROR Error executing user-provided callback on message for topic-partition mm-benchmark-test-0: (org.apache.kafka.clients.producer.internals.RecordBatch) java.util.IllegalFormatConversionException: d != kafka.tools.MirrorMaker$UnackedOffset at java.util.Formatter$FormatSpecifier.failConversion(Formatter.java:4045) at java.util.Formatter$FormatSpecifier.printInteger(Formatter.java:2748) at java.util.Formatter$FormatSpecifier.print(Formatter.java:2702) at java.util.Formatter.format(Formatter.java:2488) at java.util.Formatter.format(Formatter.java:2423) at java.lang.String.format(String.java:2790) at scala.collection.immutable.StringLike$class.format(StringLike.scala:266) at scala.collection.immutable.StringOps.format(StringOps.scala:31) at kafka.tools.MirrorMaker$MirrorMakerProducerCallback$$anonfun$onCompletion$2.apply(MirrorMaker.scala:592) at kafka.tools.MirrorMaker$MirrorMakerProducerCallback$$anonfun$onCompletion$2.apply(MirrorMaker.scala:592) at kafka.utils.Logging$class.trace(Logging.scala:36) at kafka.tools.MirrorMaker$.trace(MirrorMaker.scala:57) at kafka.tools.MirrorMaker$MirrorMakerProducerCallback.onCompletion(MirrorMaker.scala:592) at org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:91) at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:267) at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:235) at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:55) at org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:312) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:225) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:199) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:124) at java.lang.Thread.run(Thread.java:745) [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending message with value size 13 (kafka.tools.MirrorMaker$ProducerThread) -- Regards, Tao -- Regards, Tao
Re: TopicFilters and 0.9 Consumer
1. partition / member changes are caught on the server side, who will notify consumers to re-balance. 2. topic changes are caught on the client side through metadata request, who will then re-join the group with the new topic list to the server to re-balance. Guozhang On Fri, Mar 6, 2015 at 8:49 AM, Vinoth Chandar vin...@uber.com wrote: Hi Guozhang, Thanks for confirming. It should be straightforward to make subscribe(TopicFilter) and subscribe(TopicFilter, Partition) work for added/removed topics, since this is mostly regex matching against zookeeper metadata. But any thoughts on how repartitioning would work? (we need to let the consumers know there are more partitions now, so they can spin up more consumers for these partitions). Is there a writeup of sorts somewhere, hinting at what to expect? Thanks Vinoth On Thu, Mar 5, 2015 at 8:56 AM, Guozhang Wang wangg...@gmail.com wrote: Vinoth, Yes we do have plans to continue supporting topic filters in 0.9 consumers, the APIs are not there yet though. Guozhang On Thu, Mar 5, 2015 at 8:32 AM, Vinoth Chandar vin...@uber.com wrote: Hi guys, I was wondering what the plan in 0.9, was for the topic filters that are today in the High level consumer. The new API' http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/org/apache/kafka/clients/consumer/KafkaConsumer.html s subscribe methods, seem to be working with real topic names (which I personally like). Given, the topic filters today support automatic topic discovery (e.g.: MirrorMaker), would there be another mechanism to notify consumers of new topics and change in partitions? (I was thinking about an internal topic that provided these system changes, that consumers can listen to and choose whether or not they want to subscribe to say a newly created topic). Thanks Vinoth -- -- Guozhang -- -- Guozhang
Possible to count for unclosed resources in process
One of our staff has has been terrible at adding finally clauses to close kafka resources. Does the kafka scala/Java client maintain a count or list of open producers/consumers/client connections?
Re: Possible to count for unclosed resources in process
It doesn't keep track specifically, but there are open sockets that may take a while to clean themselves up. Note that if you use the async producer and don't close the producer nicely, you may miss messages as the connection will close before all messages are sent. Guess how we found out? :) Similar for consumer, if you use high level consumer and don't close the consumer nicely, you may not acknowledge the last messages and they will be re-read next time the consumer starts, leading to duplicates. Gwen On Fri, Mar 6, 2015 at 12:40 PM, Stuart Reynolds s...@stureynolds.com wrote: One of our staff has has been terrible at adding finally clauses to close kafka resources. Does the kafka scala/Java client maintain a count or list of open producers/consumers/client connections?
Re: Possible to count for unclosed resources in process
You could also take a thread dump to try to find them by their network threads. For example this is how new producer network threads are named: String ioThreadName = kafka-producer-network-thread + (clientId.length() 0 ? | + clientId : ); On Fri, Mar 6, 2015 at 1:04 PM, Gwen Shapira gshap...@cloudera.com wrote: It doesn't keep track specifically, but there are open sockets that may take a while to clean themselves up. Note that if you use the async producer and don't close the producer nicely, you may miss messages as the connection will close before all messages are sent. Guess how we found out? :) Similar for consumer, if you use high level consumer and don't close the consumer nicely, you may not acknowledge the last messages and they will be re-read next time the consumer starts, leading to duplicates. Gwen On Fri, Mar 6, 2015 at 12:40 PM, Stuart Reynolds s...@stureynolds.com wrote: One of our staff has has been terrible at adding finally clauses to close kafka resources. Does the kafka scala/Java client maintain a count or list of open producers/consumers/client connections? -- Thanks, Ewen
Re: JMS to Kafka: Inbuilt JMSAdaptor/JMSProxy/JMSBridge (Client can speak JMS but hit Kafka)
Thank you Jay for your note. So JMSAdaptor(or maybe, MQKafkaBridge?) prototype, it is then! Will run a feature compatibility feasibility check. Thanks Rekha On 3/6/15, 8:45 AM, Jay Kreps jay.kr...@gmail.com wrote: I think this is great. I assume the form this would take would be a library that implements the JMS api that wraps the existing java producer and consumer? Our past experience has been that trying to maintain all this stuff centrally is too hard and tends to stifle rather than support innovation. So if you are interested in doing this I would recommend doing a small github project. We will definitely help promote it. Several people have asked for it so I suspect you would definitely get some usage. I would also love to hear how well that adaption works in practice--i.e. what percentage of JMS features are supportable by Kafka. -Jay On Thu, Mar 5, 2015 at 6:30 PM, Joshi, Rekha rekha_jo...@intuit.com wrote: Hi, Kafka is a great alternative to JMS, providing high performance, throughput as scalable, distributed pub sub/commit log service. However there always exist traditional systems running on JMS. Rather than rewriting, it would be great if we just had an inbuilt JMSAdaptor/JMSProxy/JMSBridge by which client can speak JMS but hit Kafka behind-the-scene. Something like Chukwa's o.a.h.chukwa.datacollection.adaptor.jms.JMSAdaptor, which receives msg off JMS queue and transforms to a Chukwa chunk? I have come across folks talking of this need in past as well.Is it considered and/or part of the roadmap? http://grokbase.com/t/kafka/users/131cst8xpv/stomp-binding-for-kafka http://grokbase.com/t/kafka/users/148dm4247q/consuming-messages-from-kafk a-and-pushing-on-to-a-jms-queue http://grokbase.com/t/kafka/users/143hjepbn2/request-kafka-zookeeper-jms- details Looking for inputs on correct way to approach this so to retain all good features of Kafka while still not rewriting entire application.Possible? Thanks Rekha
Re: Which node do you send data to?
Spencer, Kafka (and it's clients) handle failover automatically for you. When you create a topic, you can select a replication factor. For a replication factor n, each partition of the topic will be replicated to n different brokers. At any given time, one of those brokers is considered the leader for that topic and that is the only server you can communicate with to produce new messages. That broker will then make sure the data is copied to the other replicas. If that leader fails, one of the replicas will take over and the producer will have to send data to that node. But all of this should happen automatically. As long as you set the bootstrap.servers setting (for the new producer) so that at least one of them is always available (ideally it should just include all the brokers), then you shouldn't have to worry about this. Of course if a node fails you have to deal with bringing it back up/moving it to a new machine, but producers should continue to function normally by moving traffic to the new leader. On Fri, Mar 6, 2015 at 9:05 PM, Daniel Moreno d...@max2.com wrote: Hi Spencer, You can configure your producers with a list of brokers. You can add all, but usually at least two of the brokers in your cluster. Kind Regards, Daniel Moreno On Mar 6, 2015, at 23:43, Spencer Owen so...@netdocuments.commailto: so...@netdocuments.com wrote: I've setup a kafka cluster with 3 nodes. Which node should I push the data to? I would normally push to kafka01, but if that node goes down, then the entire cluster goes down. How have other people solved this. Maybe a nginx reverse proxy? http://stackoverflow.com/questions/28911410/which-node-should-i-push-data-to-in-a-cluster This message may contain information that is privileged or confidential. If you received this transmission in error, please notify the sender by reply e-mail and delete the message and any attachments. Warning: All email sent to this address will be received by the NetDocuments corporate e-mail system and is subject to archival and review by someone other than the recipient. The services to which this email (or any email in reply or to which this email replies) relates are provided solely by NetDocuments and NetDocuments has no responsibility for the services to which this email relates (or any email in reply or to which this email replies). -- Thanks, Ewen
Re: Which node do you send data to?
Hi Spencer, You can configure your producers with a list of brokers. You can add all, but usually at least two of the brokers in your cluster. Kind Regards, Daniel Moreno On Mar 6, 2015, at 23:43, Spencer Owen so...@netdocuments.commailto:so...@netdocuments.com wrote: I've setup a kafka cluster with 3 nodes. Which node should I push the data to? I would normally push to kafka01, but if that node goes down, then the entire cluster goes down. How have other people solved this. Maybe a nginx reverse proxy? http://stackoverflow.com/questions/28911410/which-node-should-i-push-data-to-in-a-cluster This message may contain information that is privileged or confidential. If you received this transmission in error, please notify the sender by reply e-mail and delete the message and any attachments. Warning: All email sent to this address will be received by the NetDocuments corporate e-mail system and is subject to archival and review by someone other than the recipient. The services to which this email (or any email in reply or to which this email replies) relates are provided solely by NetDocuments and NetDocuments has no responsibility for the services to which this email relates (or any email in reply or to which this email replies).
Which node do you send data to?
I've setup a kafka cluster with 3 nodes. Which node should I push the data to? I would normally push to kafka01, but if that node goes down, then the entire cluster goes down. How have other people solved this. Maybe a nginx reverse proxy? http://stackoverflow.com/questions/28911410/which-node-should-i-push-data-to-in-a-cluster This message may contain information that is privileged or confidential. If you received this transmission in error, please notify the sender by reply e-mail and delete the message and any attachments. Warning: All email sent to this address will be received by the NetDocuments corporate e-mail system and is subject to archival and review by someone other than the recipient. The services to which this email (or any email in reply or to which this email replies) relates are provided solely by NetDocuments and NetDocuments has no responsibility for the services to which this email relates (or any email in reply or to which this email replies).
Re: Kafka to Hadoop HDFS
This presentation from a recent Kafka meetup in NYC describes different approaches. http://www.slideshare.net/gwenshap/kafka-hadoop-for-nyc-kafka-meetup?ref=http://ingest.tips/2014/10/16/notes-from-kafka-meetup/ It´s companion blog post is this: http://ingest.tips/2014/10/16/notes-from-kafka-meetup/ Cheers Max On Sat, Mar 7, 2015 at 12:18 AM, Daniel Moreno d...@max2.com wrote: You can also try the approach described here http://blog.cloudera.com/blog/2014/11/flafka-apache-flume-meets-apache-kafka-for-event-processing/ Kind Regards, Daniel On Mar 6, 2015, at 23:20, Lin Ma lin...@gmail.commailto:lin...@gmail.com wrote: Hi Kafka masters, Wondering if any open source solutions, to transfer message received from Kakfa to Hadoop HDFS? Thanks. regards, Lin
Re: Kafka to Hadoop HDFS
You can also try the approach described here http://blog.cloudera.com/blog/2014/11/flafka-apache-flume-meets-apache-kafka-for-event-processing/ Kind Regards, Daniel On Mar 6, 2015, at 23:20, Lin Ma lin...@gmail.commailto:lin...@gmail.com wrote: Hi Kafka masters, Wondering if any open source solutions, to transfer message received from Kakfa to Hadoop HDFS? Thanks. regards, Lin
RE: Kafka to Hadoop HDFS
Try this. https://github.com/linkedin/camus Aditya From: Lin Ma [lin...@gmail.com] Sent: Friday, March 06, 2015 8:19 PM To: users@kafka.apache.org Subject: Kafka to Hadoop HDFS Hi Kafka masters, Wondering if any open source solutions, to transfer message received from Kakfa to Hadoop HDFS? Thanks. regards, Lin
Re: Got java.util.IllegalFormatConversionException when running MirrorMaker off trunk code
I think I worked out the root cause Line 593 in MirrorMaker.scala trace(Updating offset for %s to %d.format(topicPartition, offset)) should be trace(Updating offset for %s to %d.format(topicPartition, offset.element)) On Sat, Mar 7, 2015 at 2:12 AM, tao xiao xiaotao...@gmail.com wrote: A bit more context: I turned on async in producer.properties On Sat, Mar 7, 2015 at 2:09 AM, tao xiao xiaotao...@gmail.com wrote: Hi team, I am having java.util.IllegalFormatConversionException when running MirrorMaker with log level set to trace. The code is off latest trunk with commit 8f0003f9b694b4da5fbd2f86db872d77a43eb63f The way I bring up is bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config ~/Downloads/kafka/kafka_2.10-0.8.2.0/config/consumer.properties --producer.config ~/Downloads/kafka/kafka_2.10-0.8.2.0/config/producer.properties --num.streams 1 --num.producers 1 --no.data.loss --whitelist mm-benchmark-test\\w* --offset.commit.interval.ms 1 --queue.byte.size 1024 and set the log level to trace in tools-log4j.properties here is the log snippet [2015-03-07 02:04:27,211] TRACE [mirrormaker-producer-0] Sending message with value size 13 (kafka.tools.MirrorMaker$ProducerThread) [2015-03-07 02:04:27,211] TRACE Sending record ProducerRecord(topic=mm-benchmark-test, partition=null, key=[B@130362d0, value=[B@434c4f70 with callback kafka.tools.MirrorMaker$MirrorMakerProducerCallback@46f36494 to topic mm-benchmark-test partition 0 (org.apache.kafka.clients.producer.KafkaProducer) [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending message with value size 13 (kafka.tools.MirrorMaker$ProducerThread) [2015-03-07 02:04:27,212] TRACE Sending record ProducerRecord(topic=mm-benchmark-test, partition=null, key=[B@54957b67, value=[B@21d8d293 with callback kafka.tools.MirrorMaker$MirrorMakerProducerCallback@21e8c241 to topic mm-benchmark-test partition 0 (org.apache.kafka.clients.producer.KafkaProducer) [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending message with value size 13 (kafka.tools.MirrorMaker$ProducerThread) [2015-03-07 02:04:27,212] TRACE Sending record ProducerRecord(topic=mm-benchmark-test, partition=null, key=[B@1eed723b, value=[B@1acd590b with callback kafka.tools.MirrorMaker$MirrorMakerProducerCallback@1f90eeec to topic mm-benchmark-test partition 0 (org.apache.kafka.clients.producer.KafkaProducer) [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending message with value size 13 (kafka.tools.MirrorMaker$ProducerThread) [2015-03-07 02:04:27,212] TRACE Sending record ProducerRecord(topic=mm-benchmark-test, partition=null, key=[B@3ae8a936, value=[B@bd3671 with callback kafka.tools.MirrorMaker$MirrorMakerProducerCallback@6413518 to topic mm-benchmark-test partition 0 (org.apache.kafka.clients.producer.KafkaProducer) [2015-03-07 02:04:27,212] ERROR Error executing user-provided callback on message for topic-partition mm-benchmark-test-0: (org.apache.kafka.clients.producer.internals.RecordBatch) java.util.IllegalFormatConversionException: d != kafka.tools.MirrorMaker$UnackedOffset at java.util.Formatter$FormatSpecifier.failConversion(Formatter.java:4045) at java.util.Formatter$FormatSpecifier.printInteger(Formatter.java:2748) at java.util.Formatter$FormatSpecifier.print(Formatter.java:2702) at java.util.Formatter.format(Formatter.java:2488) at java.util.Formatter.format(Formatter.java:2423) at java.lang.String.format(String.java:2790) at scala.collection.immutable.StringLike$class.format(StringLike.scala:266) at scala.collection.immutable.StringOps.format(StringOps.scala:31) at kafka.tools.MirrorMaker$MirrorMakerProducerCallback$$anonfun$onCompletion$2.apply(MirrorMaker.scala:592) at kafka.tools.MirrorMaker$MirrorMakerProducerCallback$$anonfun$onCompletion$2.apply(MirrorMaker.scala:592) at kafka.utils.Logging$class.trace(Logging.scala:36) at kafka.tools.MirrorMaker$.trace(MirrorMaker.scala:57) at kafka.tools.MirrorMaker$MirrorMakerProducerCallback.onCompletion(MirrorMaker.scala:592) at org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:91) at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:267) at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:235) at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:55) at org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:312) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:225) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:199) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:124) at java.lang.Thread.run(Thread.java:745) [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending message with value size 13 (kafka.tools.MirrorMaker$ProducerThread) -- Regards, Tao -- Regards, Tao
Re: Database Replication Question
Hi, James, You also mentioned you want to implement another critical component for monitoring the data consistency between your sources and targets and correcting the inconsistent data. Unfortunately, data compare is not easy when some applications/replications still change your source and target tables. It might be more complicated than your current replication solution. I spent a whole year to complete such a component. I can’t tell you how my solution works, but I can list a few challenges to you: - To avoid the influences on your application, the verification components should not lock your table or data for reading the table. You need to consider how to deal with the uncommitted transactions during the data compare. - The end-to-end latency is not static. Some transactions might be not committed for a long time. Replication latency might be also fluctuate. - The rows might keep changing during your compare. How to deal with these hot rows? - The distance between your source and target could be very long. The compare volume could be very large. However, the compare speed should very fast and your verification component should not eat a lot of CPU and memory resources. Best wishes, Xiao Li On Mar 5, 2015, at 11:07 AM, James Cheng jch...@tivo.com wrote: On Mar 5, 2015, at 12:59 AM, Xiao lixiao1...@gmail.com wrote: Hi, James, This design regarding the restart point has a few potential issues, I think. - The restart point is based on the messages that you last published. The message could be pruned. How large is your log.retention.hours? That's a good point. In my case, I will be publishing this into a log compacted topic. My goal is that by reading the log compacted topic from beginning to end, and then continually applying new items as they come in, that it will always contain the final state of the database. So messages will only be pruned if they are being overwritten by later ones, and in that case, the later one is the one that I now care about. - If the Kafka message order is different from your log sequence, your replication might lose the data. Good point. I think it depends on use case, and also if it's possible to recover lost data. I don't think I'll be losing any messages, but it's possible that, due to network delays, that the items might arrive in Kafka out of order. And in my use case, that would be bad because last items wins. In the log compaction case, I would be able to detect that because the last item in Kafka for a particular primary key would not match the state of the item in the mysql database. I could repair that item by re-publishing what is in the database, out to the stream. I planned to have an auditor monitor the correctness of the Kafka topic, to ensure that it reflected the state of the database. I think I will now need to add a correction component, that will republish any items that are incorrect. First, I think you can maintain a local persistence media for recording the last published message id. That works, but there is a small window of failure that can result in duplicates. If you normally: 1) Write to Kafka 2) Write your last published message id somewhere. You might crash between steps 1 and 2. When you come back up, you might end up re-publishing your last event. -James Second, if you can add into each message a strictly increasing dense ID in the producers, you can easily recover the sequences in the consumers. If so, you can have multiple producers publish the messages at the same time. This could improve your throughput and your consumers can easily identify if any message is lost due to any reason. Best wishes, Xiao Li On Mar 4, 2015, at 4:59 PM, James Cheng jch...@tivo.com wrote: Another thing to think about is delivery guarantees. Exactly once, at least once, etc. If you have a publisher that consumes from the database log and pushes out to Kafka, and then the publisher crashes, what happens when it starts back up? Depending on how you keep track of the database's transaction id/scn/offset, you may end up re-publishing events that you already published out to the kafka topic. I am also working on database replication, namely from MySQL to Kafka. I'm using some of the ideas from http://ben.kirw.in/2014/11/28/kafka-patterns/ in order to get exactly once processing, so that I don't have any duplicates in my kafka stream. Specifically, I have the publisher write messages to a single topic (I think/hope that Kafka's throughput is high enough). I include MySQL's binary log coordinates into my output messages. Upon startup, I read back the end of my topic to find out what messages I published. This gives me 2 pieces of information: 1) The MySQL binary log coordinates, so I know where to start again. 2) The messages that I last published, to make sure that I don't re-publish them.
Re: Database Replication Question
Hi, James, uh… iOS Gmail app crashed. Let me resend the email to answer your concern. First, I am not a Kafka user. Like you, I am trying to see if Kafka can be used for replication-related tasks. If Kafka can provide unit of work, the design will be much simpler. As Guozhang said, Kafka has a plan to introduce the transaction concepts. Hopefully, it can be available soon. https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka Before this critical feature is available, let us discuss if database replication can be directly built on the current Kafka with very minor Kafka code changes. That works, but there is a small window of failure that can result in duplicates. If you normally: 1) Write to Kafka 2) Write your last published message id somewhere. You might crash between steps 1 and 2. When you come back up, you might end up re-publishing your last event. It can crash at any step and any time point. How to recover it from an unplanned disaster without doing a full refresh? 1) Kafka could crash (i.e., did not gracefully stop). It could lose all the unflushed/non-fsynced data until the latest recovery-point. — I still hope Kafka can always maintain two recovery points in separate files. At least we can ensure one recovery point is valid if the latest one is corrupted during an update. — The consumers might already read the non-fsynced but flushed messages. That means, the producers should still need to republish the published messages. Note fileOutputStream.getFD().sync() is doing fsync and FileChannel.force() is doing flush. 2) The producers could crash before completing the update on “last published message ID”. That means, you might need to republish the last messages again. To me, republishing messages might be unavoidable for the current Kafka. Thus, the design of consumers should assume the Kafka messages might be duplicate. Basically, we can introduce the dense ID generated by your consumers for each Kafka message. Each restart, we always choose the earliest time points of. — The recovery points (offsets) in Kafka recovery-point file, — The offsets and IDs of the last message in the partitions. — Your local last published message IDs. Best wishes, Xiao Li On Mar 5, 2015, at 11:07 AM, James Cheng jch...@tivo.com wrote: On Mar 5, 2015, at 12:59 AM, Xiao lixiao1...@gmail.com wrote: Hi, James, This design regarding the restart point has a few potential issues, I think. - The restart point is based on the messages that you last published. The message could be pruned. How large is your log.retention.hours? That's a good point. In my case, I will be publishing this into a log compacted topic. My goal is that by reading the log compacted topic from beginning to end, and then continually applying new items as they come in, that it will always contain the final state of the database. So messages will only be pruned if they are being overwritten by later ones, and in that case, the later one is the one that I now care about. - If the Kafka message order is different from your log sequence, your replication might lose the data. Good point. I think it depends on use case, and also if it's possible to recover lost data. I don't think I'll be losing any messages, but it's possible that, due to network delays, that the items might arrive in Kafka out of order. And in my use case, that would be bad because last items wins. In the log compaction case, I would be able to detect that because the last item in Kafka for a particular primary key would not match the state of the item in the mysql database. I could repair that item by re-publishing what is in the database, out to the stream. I planned to have an auditor monitor the correctness of the Kafka topic, to ensure that it reflected the state of the database. I think I will now need to add a correction component, that will republish any items that are incorrect. First, I think you can maintain a local persistence media for recording the last published message id. That works, but there is a small window of failure that can result in duplicates. If you normally: 1) Write to Kafka 2) Write your last published message id somewhere. You might crash between steps 1 and 2. When you come back up, you might end up re-publishing your last event. -James Second, if you can add into each message a strictly increasing dense ID in the producers, you can easily recover the sequences in the consumers. If so, you can have multiple producers publish the messages at the same time. This could improve your throughput and your consumers can easily identify if any message is lost due to any reason. Best wishes, Xiao Li On Mar 4, 2015, at 4:59 PM, James Cheng jch...@tivo.com wrote: Another thing to think about is delivery guarantees. Exactly once, at least once, etc. If you have a