Repository: kafka Updated Branches: refs/heads/trunk cf8f4a713 -> eaaa433fc
http://git-wip-us.apache.org/repos/asf/kafka/blob/eaaa433f/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 78e00df..1b0a127 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -24,17 +24,16 @@ import kafka.admin.AdminUtils import kafka.cluster.BrokerEndPoint import kafka.log.LogConfig import kafka.message.ByteBufferMessageSet -import kafka.api.{KAFKA_0_9_0, KAFKA_0_10_0_IV0, KAFKA_0_10_1_IV1} +import kafka.api.{KAFKA_0_10_0_IV0, KAFKA_0_10_1_IV1, KAFKA_0_10_1_IV2, KAFKA_0_9_0} import kafka.common.{KafkaStorageException, TopicAndPartition} import ReplicaFetcherThread._ - -import org.apache.kafka.clients.{ManualMetadataUpdater, NetworkClient, ClientRequest, ClientResponse} -import org.apache.kafka.common.network.{LoginType, Selectable, ChannelBuilders, NetworkReceive, Selector, Mode} -import org.apache.kafka.common.requests.{ListOffsetResponse, FetchResponse, RequestSend, AbstractRequest, ListOffsetRequest} +import org.apache.kafka.clients.{ClientRequest, ClientResponse, ManualMetadataUpdater, NetworkClient} +import org.apache.kafka.common.network.{ChannelBuilders, LoginType, Mode, NetworkReceive, Selectable, Selector} +import org.apache.kafka.common.requests.{AbstractRequest, FetchResponse, ListOffsetRequest, ListOffsetResponse, RequestSend} import org.apache.kafka.common.requests.{FetchRequest => JFetchRequest} import org.apache.kafka.common.{Node, TopicPartition} import org.apache.kafka.common.metrics.Metrics -import org.apache.kafka.common.protocol.{Errors, ApiKeys} +import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.utils.Time import scala.collection.{JavaConverters, Map} @@ -263,15 +262,23 @@ class ReplicaFetcherThread(name: String, } private def earliestOrLatestOffset(topicPartition: TopicPartition, earliestOrLatest: Long, consumerId: Int): Long = { - val partitions = Map( - topicPartition -> new ListOffsetRequest.PartitionData(earliestOrLatest, 1) - ) - val request = new ListOffsetRequest(consumerId, partitions.asJava) - val clientResponse = sendRequest(ApiKeys.LIST_OFFSETS, None, request) + val (request, apiVersion) = + if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_1_IV2) { + val partitions = Map(topicPartition -> earliestOrLatest) + (new ListOffsetRequest(partitions.asJava, consumerId), 1) + } else { + val partitions = Map(topicPartition -> new ListOffsetRequest.PartitionData(earliestOrLatest, 1)) + (new ListOffsetRequest(consumerId, partitions.asJava), 0) + } + val clientResponse = sendRequest(ApiKeys.LIST_OFFSETS, Some(apiVersion.toShort), request) val response = new ListOffsetResponse(clientResponse.responseBody) val partitionData = response.responseData.get(topicPartition) Errors.forCode(partitionData.errorCode) match { - case Errors.NONE => partitionData.offsets.asScala.head + case Errors.NONE => + if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_1_IV2) + partitionData.offset + else + partitionData.offsets.get(0) case errorCode => throw errorCode.exception } } http://git-wip-us.apache.org/repos/asf/kafka/blob/eaaa433f/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala index b449a69..c83b54f 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala @@ -14,7 +14,8 @@ package kafka.api import java.util -import java.util.Properties +import java.util.{Collections, Properties, Locale} + import java.util.regex.Pattern import kafka.log.LogConfig @@ -26,7 +27,7 @@ import org.apache.kafka.common.serialization.{ByteArraySerializer, StringDeseria import org.apache.kafka.test.{MockConsumerInterceptor, MockProducerInterceptor} import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord} import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.errors.{InvalidTopicException} +import org.apache.kafka.common.errors.InvalidTopicException import org.apache.kafka.common.record.{CompressionType, TimestampType} import org.apache.kafka.common.serialization.ByteArrayDeserializer import org.junit.Assert._ @@ -34,7 +35,6 @@ import org.junit.Test import scala.collection.JavaConverters._ import scala.collection.mutable.Buffer -import java.util.Locale /* We have some tests in this class instead of `BaseConsumerTest` in order to keep the build time under control. */ class PlaintextConsumerTest extends BaseConsumerTest { @@ -973,6 +973,84 @@ class PlaintextConsumerTest extends BaseConsumerTest { } @Test + def testOffsetsForTimes() { + val numParts = 2 + val topic1 = "part-test-topic-1" + val topic2 = "part-test-topic-2" + val topic3 = "part-test-topic-3" + val props = new Properties() + props.setProperty(LogConfig.MessageFormatVersionProp, "0.9.0") + TestUtils.createTopic(this.zkUtils, topic1, numParts, 1, this.servers) + // Topic2 is in old message format. + TestUtils.createTopic(this.zkUtils, topic2, numParts, 1, this.servers, props) + TestUtils.createTopic(this.zkUtils, topic3, numParts, 1, this.servers) + + val consumer = this.consumers.head + + // Test negative target time + intercept[IllegalArgumentException]( + consumer.offsetsForTimes(Collections.singletonMap(new TopicPartition(topic1, 0), -1))) + + val timestampsToSearch = new util.HashMap[TopicPartition, java.lang.Long]() + var i = 0 + for (topic <- List(topic1, topic2, topic3)) { + for (part <- 0 until numParts) { + val tp = new TopicPartition(topic, part) + // In sendRecords(), each message will have key, value and timestamp equal to the sequence number. + sendRecords(100, tp) + timestampsToSearch.put(tp, i * 20) + i += 1 + } + } + // The timestampToSearch map should contain: + // (topic1Partition0 -> 0, + // topic1Partitoin1 -> 20, + // topic2Partition0 -> 40, + // topic2Partition1 -> 60, + // topic3Partition0 -> 80, + // topic3Partition1 -> 100) + val timestampOffsets = consumer.offsetsForTimes(timestampsToSearch) + assertEquals(0, timestampOffsets.get(new TopicPartition(topic1, 0)).offset()) + assertEquals(0, timestampOffsets.get(new TopicPartition(topic1, 0)).timestamp()) + assertEquals(20, timestampOffsets.get(new TopicPartition(topic1, 1)).offset()) + assertEquals(20, timestampOffsets.get(new TopicPartition(topic1, 1)).timestamp()) + assertEquals("null should be returned when message format is 0.9.0", + null, timestampOffsets.get(new TopicPartition(topic2, 0))) + assertEquals("null should be returned when message format is 0.9.0", + null, timestampOffsets.get(new TopicPartition(topic2, 1))) + assertEquals(80, timestampOffsets.get(new TopicPartition(topic3, 0)).offset()) + assertEquals(80, timestampOffsets.get(new TopicPartition(topic3, 0)).timestamp()) + assertEquals(null, timestampOffsets.get(new TopicPartition(topic3, 1))) + } + + @Test + def testEarliestOrLatestOffsets() { + val topic0 = "topicWithNewMessageFormat" + val topic1 = "topicWithOldMessageFormat" + createTopicAndSendRecords(topicName = topic0, numPartitions = 2, recordsPerPartition = 100) + val props = new Properties() + props.setProperty(LogConfig.MessageFormatVersionProp, "0.9.0") + TestUtils.createTopic(this.zkUtils, topic1, numPartitions = 1, replicationFactor = 1, this.servers, props) + sendRecords(100, new TopicPartition(topic1, 0)) + + val t0p0 = new TopicPartition(topic0, 0) + val t0p1 = new TopicPartition(topic0, 1) + val t1p0 = new TopicPartition(topic1, 0) + val partitions = Set(t0p0, t0p1, t1p0).asJava + val consumer = this.consumers.head + + val earliests = consumer.beginningOffsets(partitions) + assertEquals(0L, earliests.get(t0p0)) + assertEquals(0L, earliests.get(t0p1)) + assertEquals(0L, earliests.get(t1p0)) + + val latests = consumer.endOffsets(partitions) + assertEquals(100L, latests.get(t0p0)) + assertEquals(100L, latests.get(t0p1)) + assertEquals(100L, latests.get(t1p0)) + } + + @Test def testUnsubscribeTopic() { this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100") // timeout quickly to avoid slow test this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30") http://git-wip-us.apache.org/repos/asf/kafka/blob/eaaa433f/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala index 49feebd..04d46de 100644 --- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala @@ -199,17 +199,17 @@ class LogSegmentTest { assertEquals(490, seg.largestTimestamp) // Search for an indexed timestamp - assertEquals(42, seg.findOffsetByTimestamp(420).get) - assertEquals(43, seg.findOffsetByTimestamp(421).get) + assertEquals(42, seg.findOffsetByTimestamp(420).get.offset) + assertEquals(43, seg.findOffsetByTimestamp(421).get.offset) // Search for an un-indexed timestamp - assertEquals(43, seg.findOffsetByTimestamp(430).get) - assertEquals(44, seg.findOffsetByTimestamp(431).get) + assertEquals(43, seg.findOffsetByTimestamp(430).get.offset) + assertEquals(44, seg.findOffsetByTimestamp(431).get.offset) // Search beyond the last timestamp - assertEquals(50, seg.findOffsetByTimestamp(491).get) + assertEquals(None, seg.findOffsetByTimestamp(491)) // Search before the first indexed timestamp - assertEquals(41, seg.findOffsetByTimestamp(401).get) + assertEquals(41, seg.findOffsetByTimestamp(401).get.offset) // Search before the first timestamp - assertEquals(40, seg.findOffsetByTimestamp(399).get) + assertEquals(40, seg.findOffsetByTimestamp(399).get.offset) } /** @@ -251,7 +251,7 @@ class LogSegmentTest { TestUtils.writeNonsenseToFile(indexFile, 5, indexFile.length.toInt) seg.recover(64*1024) for(i <- 0 until 100) - assertEquals(i, seg.read(i, Some(i+1), 1024).messageSet.head.offset) + assertEquals(i, seg.read(i, Some(i + 1), 1024).messageSet.head.offset) } /** @@ -267,8 +267,9 @@ class LogSegmentTest { TestUtils.writeNonsenseToFile(timeIndexFile, 5, timeIndexFile.length.toInt) seg.recover(64*1024) for(i <- 0 until 100) { - assertEquals(i, seg.findOffsetByTimestamp(i * 10).get) - assertEquals(i + 1, seg.findOffsetByTimestamp(i * 10 + 1).get) + assertEquals(i, seg.findOffsetByTimestamp(i * 10).get.offset) + if (i < 99) + assertEquals(i + 1, seg.findOffsetByTimestamp(i * 10 + 1).get.offset) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/eaaa433f/core/src/test/scala/unit/kafka/log/LogTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index e496853..9ecb651 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -625,9 +625,9 @@ class LogTest extends JUnitSuite { for(i <- 0 until numMessages) { assertEquals(i, log.read(i, 100, None).messageSet.head.offset) if (i == 0) - assertEquals(log.logSegments.head.baseOffset, log.fetchOffsetsByTimestamp(time.milliseconds + i * 10)) + assertEquals(log.logSegments.head.baseOffset, log.fetchOffsetsByTimestamp(time.milliseconds + i * 10).get.offset) else - assertEquals(i, log.fetchOffsetsByTimestamp(time.milliseconds + i * 10)) + assertEquals(i, log.fetchOffsetsByTimestamp(time.milliseconds + i * 10).get.offset) } log.close() } @@ -701,9 +701,9 @@ class LogTest extends JUnitSuite { for(i <- 0 until numMessages) { assertEquals(i, log.read(i, 100, None).messageSet.head.offset) if (i == 0) - assertEquals(log.logSegments.head.baseOffset, log.fetchOffsetsByTimestamp(time.milliseconds + i * 10)) + assertEquals(log.logSegments.head.baseOffset, log.fetchOffsetsByTimestamp(time.milliseconds + i * 10).get.offset) else - assertEquals(i, log.fetchOffsetsByTimestamp(time.milliseconds + i * 10)) + assertEquals(i, log.fetchOffsetsByTimestamp(time.milliseconds + i * 10).get.offset) } log.close() } http://git-wip-us.apache.org/repos/asf/kafka/blob/eaaa433f/docs/upgrade.html ---------------------------------------------------------------------- diff --git a/docs/upgrade.html b/docs/upgrade.html index d8c2b4e..629babb 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -20,8 +20,30 @@ <h3><a id="upgrade" href="#upgrade">1.5 Upgrading From Previous Versions</a></h3> <h4><a id="upgrade_10_1" href="#upgrade_10_1">Upgrading from 0.10.0.X to 0.10.1.0</a></h4> -0.10.1.0 is compatible with 0.10.0.X in terms of wire protocol. The upgrade can be done one broker at a time by simply bringing it down, updating the code, and restarting it. +0.10.1.0 has wire protocol changes. By following the recommended rolling upgrade plan below, you guarantee no downtime during the upgrade. However, please notice the <a href="#upgrade_10_1_breaking">Potential breaking changes in 0.10.1.0</a> before upgrade. +<br> +Note: Because new protocols are introduced, it is important to upgrade your Kafka clusters before upgrading your clients. + +<p><b>For a rolling upgrade:</b></p> + +<ol> + <li> Update server.properties file on all brokers and add the following properties: + <ul> + <li>inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 0.8.2 or 0.9.0.0).</li> + <li>log.message.format.version=CURRENT_KAFKA_VERSION (See <a href="#upgrade_10_performance_impact">potential performance impact following the upgrade</a> for the details on what this configuration does.) + </ul> + </li> + <li> Upgrade the brokers. This can be done a broker at a time by simply bringing it down, updating the code, and restarting it. </li> + <li> Once the entire cluster is upgraded, bump the protocol version by editing inter.broker.protocol.version and setting it to 0.10.1.0. NOTE: If your previous message format version is before 0.10.0, you shouldn't touch log.message.format.version yet - this parameter should only change once all consumers have been upgraded to on or above 0.10.0.0 </li> + <li> Restart the brokers one by one for the new protocol version to take effect. </li> + <li> Once all consumers have been upgraded to 0.10.0, change log.message.format.version to 0.10.1 on each broker and restart them one by one. + </li> +</ol> + +<p><b>Note:</b> If you are willing to accept downtime, you can simply take all the brokers down, update the code and start all of them. They will start with the new protocol by default. + +<p><b>Note:</b> Bumping the protocol version and restarting can be done any time after the brokers were upgraded. It does not have to be immediately after. <h5><a id="upgrade_10_1_breaking" href="#upgrade_10_1_breaking">Potential breaking changes in 0.10.1.0</a></h5> <ul> @@ -30,6 +52,7 @@ However, please notice the <a href="#upgrade_10_1_breaking">Potential breaking c <li> The open file handlers of 0.10.0 will increase by ~33% because of the addition of time index files for each segment.</li> <li> The time index and offset index share the same index size configuration. Since each time index entry is 1.5x the size of offset index entry. User may need to increase log.index.size.max.bytes to avoid potential frequent log rolling. </li> <li> Due to the increased number of index files, on some brokers with large amount the log segments (e.g. >15K), the log loading process during the broker startup could be longer. Based on our experiment, setting the num.recovery.threads.per.data.dir to one may reduce the log loading time. </li> + <li> ListOffsetRequest v1 is introduced and used by default to support accurate offset search based on timestamp. </ul> <h5><a id="upgrade_1010_notable" href="#upgrade_1010_notable">Notable changes in 0.10.1.0</a></h5> @@ -37,6 +60,7 @@ However, please notice the <a href="#upgrade_10_1_breaking">Potential breaking c <li> The new Java consumer is no longer in beta and we recommend it for all new development. The old Scala consumers are still supported, but they will be deprecated in the next release and will be removed in a future major release. </li> <li> The BrokerState "RunningAsController" (value 4) has been removed. Due to a bug, a broker would only be in this state briefly before transitioning out of it and hence the impact of the removal should be minimal. The recommended way to detect if a given broker is the controller is via the kafka.controller:type=KafkaController,name=ActiveControllerCount metric. </li> + <li> The new Java Consumer now allows users to search offsets by timestamp on partitions. </ul> <h4><a id="upgrade_10" href="#upgrade_10">Upgrading from 0.8.x or 0.9.x to 0.10.0.0</a></h4>