Thanks Dave and Joel. I created a PR to add this note to the Upgrade Notes:
https://github.com/apache/kafka/pull/798 Please take a look. Ismael On Thu, Jan 21, 2016 at 7:43 AM, Joel Koshy <[email protected]> wrote: > Hi Dave, > > This change was introduced in > https://issues.apache.org/jira/browse/KAFKA-1755 for compacted topics. > > > > > Interestingly, none of the messages currently going to the topic use > > message > > compaction (i.e. they all have empty keys), although at some time in the > > past > > I may have sent a few messages with keys. Message compaction is being > > used for other topics. So, the 0.9.0.0 version of the broker seems to > > think the > > topic is compacted while the 0.8.2.1 broker apparently doesn't think so. > > Does > > this shed any light on things? > > > > Also I notice the error message says "Compacted topic", which suggests > that > > compaction is a property of the topic, and not individual messages as > > > > Yes - compaction is a topic-level property. You can use --describe to > verify that the topic is compacted or not and if you didn't intend it to be > compacted you can alter the configuration. > > I thought it was ok to send messages > > both > > with and without a key to the same topic, thus having compaction enabled > > for > > only a subset of the messages. Is this incorrect? > > > > In 0.9 you cannot send unkeyed messages to compacted topics. In 0.8.x this > would actually cause the log compaction thread to subsequently complain and > quit (and stop compacting all compacted topics). We did consider the > possibility of allowing producers to send both keyed and unkeyed but after > discussion we felt it would be better to fail fast and prevent unkeyed > messages from getting in. This was on the premise that supporting mixed > messages and only compacting a subset that have keys may not work very well > since the non-keyed messages would stick around indefinitely; however let > me know if you think differently on this and we can revisit. > > Joel > > > > Thanks, > > Dave > > > > > > [2016-01-20 19:21:44,923] ERROR [Replica Manager on Broker 172341926]: > > Error processing append operation on partition [shown_news_stories,7] > > (kafka.server.ReplicaManager) > > kafka.message.InvalidMessageException: Compacted topic cannot accept > > message without key. > > at > > > > > kafka.message.ByteBufferMessageSet.validateMessagesAndAssignOffsets(ByteBufferMessageSet.scala:250) > > at kafka.log.Log.liftedTree1$1(Log.scala:327) > > at kafka.log.Log.append(Log.scala:326) > > at kafka.cluster.Partition$$anonfun$9.apply(Partition.scala:442) > > at kafka.cluster.Partition$$anonfun$9.apply(Partition.scala:428) > > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) > > at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:268) > > at > > kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:428) > > at > > > > > kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:401) > > at > > > > > kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:386) > > at > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > > at > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > > at > > > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > > at > > > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > > at > > > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) > > at > scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > > at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) > > at > > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > > at > scala.collection.AbstractTraversable.map(Traversable.scala:105) > > at > > kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:386) > > at > > kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:322) > > at > > kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:366) > > at kafka.server.KafkaApis.handle(KafkaApis.scala:68) > > at > > kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) > > at java.lang.Thread.run(Thread.java:745) > > > > > > > > On Tue, Jan 19, 2016 at 2:50 AM, Ismael Juma <[email protected]> wrote: > > > > > Hi Dave, > > > > > > Do you get any errors logged in the broker when you get ACK error 2 > > > (InvalidMessage) while producing requests to a mixed version cluster? > It > > > would be helpful to see them. > > > > > > With regards to the kafka-console-producer.sh error, did you use the > > > 0.9.0.0 console producer with a mixed version cluster (ie some brokers > > were > > > on 0.8.2.1 while others were on 0.9.0.0)? If so, it is expected that it > > > won't work correctly. All the brokers should be upgraded before the > > clients > > > are upgraded (otherwise the 0.8.2.1 broker will send a response that > the > > > newer clients cannot handle). > > > > > > Ismael > > > > > > On Fri, Jan 15, 2016 at 7:52 PM, Dave Peterson <[email protected]> > > wrote: > > > > > > > Hi Ismael, > > > > > > > > I'm using bruce (https://github.com/ifwe/bruce) to send the produce > > > > requests, with a RequiredAcks value of 1. Everything works fine when > > > > all brokers are running 0.8.2.1. Also if I set up a new 0.9.0.0 > > > > cluster from scratch rather than trying to upgrade, everything works > > > > fine. The problem only occurs after upgrading one broker in the > > > > 3-broker cluster. > > > > > > > > The topic I am sending to has 8 partitions numbered 0-7. Doing > > > > further experimentation I see that the ACK error 2 occurs only when > > > > I send to partition 7. No problems occur when sending to partitions > > > > 0-6. If it helps I can send output from "kafka-topics.sh --describe" > > > > as well as tcpdump output showing the produce requests and responses. > > > > > > > > For comparison I tried using the 0.9.0.0 version of > > > > kafka-console-producer.sh to send messages. With the default > > > > RequiredAcks value of 0, it worked although I don't know which > > > > partition it sent to. With a RequiredAcks value of 1 I get the > > > > output shown below. > > > > > > > > Thanks, > > > > Dave > > > > > > > > > > > > > > > > [2016-01-15 10:33:28,843] ERROR Uncaught error in kafka producer I/O > > > > thread: (org.apache.kafka.clients.producer.internals.Sender) > > > > org.apache.kafka.common.protocol.types.SchemaException: Error reading > > > field > > > > 'throttle_time_ms': java.nio.BufferUnderflowException > > > > at > > > > org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71) > > > > at > > > > > > > > > > > > > > org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:464) > > > > at > > > > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:279) > > > > at > > > > > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216) > > > > at > > > > > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128) > > > > at java.lang.Thread.run(Thread.java:745) > > > > > > > > > > > > On Fri, Jan 15, 2016 at 1:06 AM, Ismael Juma <[email protected]> > > wrote: > > > > > > > > > Hi Dave, > > > > > > > > > > On Fri, Jan 15, 2016 at 2:04 AM, Dave Peterson <[email protected]> > > > > wrote: > > > > > > > > > > > I was trying to upgrade an 0.8.2.1 broker cluster to 0.9.0.0 by > > > > following > > > > > > the instructions here: > > > > > > > > > > > > http://kafka.apache.org/documentation.html#upgrade > > > > > > > > > > > > After upgrading one broker, with > > > inter.broker.protocol.version=0.8.2.X > > > > > > set, I get ACK error 2 (InvalidMessage) when I try to send > produce > > > > > > requests. > > > > > > > > > > > > > > > I haven't seen other reports of this issue yet. Also, we have a > > system > > > > test > > > > > that covers this scenario: > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/trunk/tests/kafkatest/tests/upgrade_test.py > > > > > > > > > > Just to double-check, what is the version of the producer that you > > are > > > > > using to send produce requests to the 0.9.0.0 broker when you get > the > > > > > error? > > > > > > > > > > Ismael > > > > > > > > > > > > > > >
