Repository: kafka
Updated Branches:
  refs/heads/trunk cf8f4a713 -> eaaa433fc


http://git-wip-us.apache.org/repos/asf/kafka/blob/eaaa433f/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 78e00df..1b0a127 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -24,17 +24,16 @@ import kafka.admin.AdminUtils
 import kafka.cluster.BrokerEndPoint
 import kafka.log.LogConfig
 import kafka.message.ByteBufferMessageSet
-import kafka.api.{KAFKA_0_9_0, KAFKA_0_10_0_IV0, KAFKA_0_10_1_IV1}
+import kafka.api.{KAFKA_0_10_0_IV0, KAFKA_0_10_1_IV1, KAFKA_0_10_1_IV2, 
KAFKA_0_9_0}
 import kafka.common.{KafkaStorageException, TopicAndPartition}
 import ReplicaFetcherThread._
-
-import org.apache.kafka.clients.{ManualMetadataUpdater, NetworkClient, 
ClientRequest, ClientResponse}
-import org.apache.kafka.common.network.{LoginType, Selectable, 
ChannelBuilders, NetworkReceive, Selector, Mode}
-import org.apache.kafka.common.requests.{ListOffsetResponse, FetchResponse, 
RequestSend, AbstractRequest, ListOffsetRequest}
+import org.apache.kafka.clients.{ClientRequest, ClientResponse, 
ManualMetadataUpdater, NetworkClient}
+import org.apache.kafka.common.network.{ChannelBuilders, LoginType, Mode, 
NetworkReceive, Selectable, Selector}
+import org.apache.kafka.common.requests.{AbstractRequest, FetchResponse, 
ListOffsetRequest, ListOffsetResponse, RequestSend}
 import org.apache.kafka.common.requests.{FetchRequest => JFetchRequest}
 import org.apache.kafka.common.{Node, TopicPartition}
 import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.protocol.{Errors, ApiKeys}
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.utils.Time
 
 import scala.collection.{JavaConverters, Map}
@@ -263,15 +262,23 @@ class ReplicaFetcherThread(name: String,
   }
 
   private def earliestOrLatestOffset(topicPartition: TopicPartition, 
earliestOrLatest: Long, consumerId: Int): Long = {
-    val partitions = Map(
-      topicPartition -> new ListOffsetRequest.PartitionData(earliestOrLatest, 
1)
-    )
-    val request = new ListOffsetRequest(consumerId, partitions.asJava)
-    val clientResponse = sendRequest(ApiKeys.LIST_OFFSETS, None, request)
+    val (request, apiVersion) =
+      if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_1_IV2) {
+        val partitions = Map(topicPartition -> earliestOrLatest)
+        (new ListOffsetRequest(partitions.asJava, consumerId), 1)
+      } else {
+        val partitions = Map(topicPartition -> new 
ListOffsetRequest.PartitionData(earliestOrLatest, 1))
+        (new ListOffsetRequest(consumerId, partitions.asJava), 0)
+      }
+    val clientResponse = sendRequest(ApiKeys.LIST_OFFSETS, 
Some(apiVersion.toShort), request)
     val response = new ListOffsetResponse(clientResponse.responseBody)
     val partitionData = response.responseData.get(topicPartition)
     Errors.forCode(partitionData.errorCode) match {
-      case Errors.NONE => partitionData.offsets.asScala.head
+      case Errors.NONE =>
+        if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_1_IV2)
+          partitionData.offset
+        else
+          partitionData.offsets.get(0)
       case errorCode => throw errorCode.exception
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaaa433f/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index b449a69..c83b54f 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -14,7 +14,8 @@ package kafka.api
 
 
 import java.util
-import java.util.Properties
+import java.util.{Collections, Properties, Locale}
+
 import java.util.regex.Pattern
 
 import kafka.log.LogConfig
@@ -26,7 +27,7 @@ import 
org.apache.kafka.common.serialization.{ByteArraySerializer, StringDeseria
 import org.apache.kafka.test.{MockConsumerInterceptor, MockProducerInterceptor}
 import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.{InvalidTopicException}
+import org.apache.kafka.common.errors.InvalidTopicException
 import org.apache.kafka.common.record.{CompressionType, TimestampType}
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import org.junit.Assert._
@@ -34,7 +35,6 @@ import org.junit.Test
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.Buffer
-import java.util.Locale
 
 /* We have some tests in this class instead of `BaseConsumerTest` in order to 
keep the build time under control. */
 class PlaintextConsumerTest extends BaseConsumerTest {
@@ -973,6 +973,84 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   }
 
   @Test
+  def testOffsetsForTimes() {
+    val numParts = 2
+    val topic1 = "part-test-topic-1"
+    val topic2 = "part-test-topic-2"
+    val topic3 = "part-test-topic-3"
+    val props = new Properties()
+    props.setProperty(LogConfig.MessageFormatVersionProp, "0.9.0")
+    TestUtils.createTopic(this.zkUtils, topic1, numParts, 1, this.servers)
+    // Topic2 is in old message format.
+    TestUtils.createTopic(this.zkUtils, topic2, numParts, 1, this.servers, 
props)
+    TestUtils.createTopic(this.zkUtils, topic3, numParts, 1, this.servers)
+
+    val consumer = this.consumers.head
+
+    // Test negative target time
+    intercept[IllegalArgumentException](
+      consumer.offsetsForTimes(Collections.singletonMap(new 
TopicPartition(topic1, 0), -1)))
+
+    val timestampsToSearch = new util.HashMap[TopicPartition, java.lang.Long]()
+    var i = 0
+    for (topic <- List(topic1, topic2, topic3)) {
+      for (part <- 0 until numParts) {
+        val tp = new TopicPartition(topic, part)
+        // In sendRecords(), each message will have key, value and timestamp 
equal to the sequence number.
+        sendRecords(100, tp)
+        timestampsToSearch.put(tp, i * 20)
+        i += 1
+      }
+    }
+    // The timestampToSearch map should contain:
+    // (topic1Partition0 -> 0,
+    //  topic1Partitoin1 -> 20,
+    //  topic2Partition0 -> 40,
+    //  topic2Partition1 -> 60,
+    //  topic3Partition0 -> 80,
+    //  topic3Partition1 -> 100)
+    val timestampOffsets = consumer.offsetsForTimes(timestampsToSearch)
+    assertEquals(0, timestampOffsets.get(new TopicPartition(topic1, 
0)).offset())
+    assertEquals(0, timestampOffsets.get(new TopicPartition(topic1, 
0)).timestamp())
+    assertEquals(20, timestampOffsets.get(new TopicPartition(topic1, 
1)).offset())
+    assertEquals(20, timestampOffsets.get(new TopicPartition(topic1, 
1)).timestamp())
+    assertEquals("null should be returned when message format is 0.9.0",
+      null, timestampOffsets.get(new TopicPartition(topic2, 0)))
+    assertEquals("null should be returned when message format is 0.9.0",
+      null, timestampOffsets.get(new TopicPartition(topic2, 1)))
+    assertEquals(80, timestampOffsets.get(new TopicPartition(topic3, 
0)).offset())
+    assertEquals(80, timestampOffsets.get(new TopicPartition(topic3, 
0)).timestamp())
+    assertEquals(null, timestampOffsets.get(new TopicPartition(topic3, 1)))
+  }
+
+  @Test
+  def testEarliestOrLatestOffsets() {
+    val topic0 = "topicWithNewMessageFormat"
+    val topic1 = "topicWithOldMessageFormat"
+    createTopicAndSendRecords(topicName = topic0, numPartitions = 2, 
recordsPerPartition = 100)
+    val props = new Properties()
+    props.setProperty(LogConfig.MessageFormatVersionProp, "0.9.0")
+    TestUtils.createTopic(this.zkUtils, topic1, numPartitions = 1, 
replicationFactor = 1, this.servers, props)
+    sendRecords(100, new TopicPartition(topic1, 0))
+
+    val t0p0 = new TopicPartition(topic0, 0)
+    val t0p1 = new TopicPartition(topic0, 1)
+    val t1p0 = new TopicPartition(topic1, 0)
+    val partitions = Set(t0p0, t0p1, t1p0).asJava
+    val consumer = this.consumers.head
+
+    val earliests = consumer.beginningOffsets(partitions)
+    assertEquals(0L, earliests.get(t0p0))
+    assertEquals(0L, earliests.get(t0p1))
+    assertEquals(0L, earliests.get(t1p0))
+
+    val latests = consumer.endOffsets(partitions)
+    assertEquals(100L, latests.get(t0p0))
+    assertEquals(100L, latests.get(t0p1))
+    assertEquals(100L, latests.get(t1p0))
+  }
+
+  @Test
   def testUnsubscribeTopic() {
     this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
"100") // timeout quickly to avoid slow test
     
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 
"30")

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaaa433f/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala 
b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 49feebd..04d46de 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -199,17 +199,17 @@ class LogSegmentTest {
 
     assertEquals(490, seg.largestTimestamp)
     // Search for an indexed timestamp
-    assertEquals(42, seg.findOffsetByTimestamp(420).get)
-    assertEquals(43, seg.findOffsetByTimestamp(421).get)
+    assertEquals(42, seg.findOffsetByTimestamp(420).get.offset)
+    assertEquals(43, seg.findOffsetByTimestamp(421).get.offset)
     // Search for an un-indexed timestamp
-    assertEquals(43, seg.findOffsetByTimestamp(430).get)
-    assertEquals(44, seg.findOffsetByTimestamp(431).get)
+    assertEquals(43, seg.findOffsetByTimestamp(430).get.offset)
+    assertEquals(44, seg.findOffsetByTimestamp(431).get.offset)
     // Search beyond the last timestamp
-    assertEquals(50, seg.findOffsetByTimestamp(491).get)
+    assertEquals(None, seg.findOffsetByTimestamp(491))
     // Search before the first indexed timestamp
-    assertEquals(41, seg.findOffsetByTimestamp(401).get)
+    assertEquals(41, seg.findOffsetByTimestamp(401).get.offset)
     // Search before the first timestamp
-    assertEquals(40, seg.findOffsetByTimestamp(399).get)
+    assertEquals(40, seg.findOffsetByTimestamp(399).get.offset)
   }
 
   /**
@@ -251,7 +251,7 @@ class LogSegmentTest {
     TestUtils.writeNonsenseToFile(indexFile, 5, indexFile.length.toInt)
     seg.recover(64*1024)
     for(i <- 0 until 100)
-      assertEquals(i, seg.read(i, Some(i+1), 1024).messageSet.head.offset)
+      assertEquals(i, seg.read(i, Some(i + 1), 1024).messageSet.head.offset)
   }
 
   /**
@@ -267,8 +267,9 @@ class LogSegmentTest {
     TestUtils.writeNonsenseToFile(timeIndexFile, 5, timeIndexFile.length.toInt)
     seg.recover(64*1024)
     for(i <- 0 until 100) {
-      assertEquals(i, seg.findOffsetByTimestamp(i * 10).get)
-      assertEquals(i + 1, seg.findOffsetByTimestamp(i * 10 + 1).get)
+      assertEquals(i, seg.findOffsetByTimestamp(i * 10).get.offset)
+      if (i < 99)
+        assertEquals(i + 1, seg.findOffsetByTimestamp(i * 10 + 1).get.offset)
     }
   }
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaaa433f/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala 
b/core/src/test/scala/unit/kafka/log/LogTest.scala
index e496853..9ecb651 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -625,9 +625,9 @@ class LogTest extends JUnitSuite {
     for(i <- 0 until numMessages) {
       assertEquals(i, log.read(i, 100, None).messageSet.head.offset)
       if (i == 0)
-        assertEquals(log.logSegments.head.baseOffset, 
log.fetchOffsetsByTimestamp(time.milliseconds + i * 10))
+        assertEquals(log.logSegments.head.baseOffset, 
log.fetchOffsetsByTimestamp(time.milliseconds + i * 10).get.offset)
       else
-        assertEquals(i, log.fetchOffsetsByTimestamp(time.milliseconds + i * 
10))
+        assertEquals(i, log.fetchOffsetsByTimestamp(time.milliseconds + i * 
10).get.offset)
     }
     log.close()
   }
@@ -701,9 +701,9 @@ class LogTest extends JUnitSuite {
     for(i <- 0 until numMessages) {
       assertEquals(i, log.read(i, 100, None).messageSet.head.offset)
       if (i == 0)
-        assertEquals(log.logSegments.head.baseOffset, 
log.fetchOffsetsByTimestamp(time.milliseconds + i * 10))
+        assertEquals(log.logSegments.head.baseOffset, 
log.fetchOffsetsByTimestamp(time.milliseconds + i * 10).get.offset)
       else
-        assertEquals(i, log.fetchOffsetsByTimestamp(time.milliseconds + i * 
10))
+        assertEquals(i, log.fetchOffsetsByTimestamp(time.milliseconds + i * 
10).get.offset)
     }
     log.close()
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/eaaa433f/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index d8c2b4e..629babb 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -20,8 +20,30 @@
 <h3><a id="upgrade" href="#upgrade">1.5 Upgrading From Previous 
Versions</a></h3>
 
 <h4><a id="upgrade_10_1" href="#upgrade_10_1">Upgrading from 0.10.0.X to 
0.10.1.0</a></h4>
-0.10.1.0 is compatible with 0.10.0.X in terms of wire protocol. The upgrade 
can be done one broker at a time by simply bringing it down, updating the code, 
and restarting it.
+0.10.1.0 has wire protocol changes. By following the recommended rolling 
upgrade plan below, you guarantee no downtime during the upgrade.
 However, please notice the <a href="#upgrade_10_1_breaking">Potential breaking 
changes in 0.10.1.0</a> before upgrade.
+<br>
+Note: Because new protocols are introduced, it is important to upgrade your 
Kafka clusters before upgrading your clients.
+
+<p><b>For a rolling upgrade:</b></p>
+
+<ol>
+    <li> Update server.properties file on all brokers and add the following 
properties:
+        <ul>
+            <li>inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 
0.8.2 or 0.9.0.0).</li>
+            <li>log.message.format.version=CURRENT_KAFKA_VERSION  (See <a 
href="#upgrade_10_performance_impact">potential performance impact following 
the upgrade</a> for the details on what this configuration does.)
+        </ul>
+    </li>
+    <li> Upgrade the brokers. This can be done a broker at a time by simply 
bringing it down, updating the code, and restarting it. </li>
+    <li> Once the entire cluster is upgraded, bump the protocol version by 
editing inter.broker.protocol.version and setting it to 0.10.1.0. NOTE: If your 
previous message format version is before 0.10.0, you shouldn't touch 
log.message.format.version yet - this parameter should only change once all 
consumers have been upgraded to on or above 0.10.0.0 </li>
+    <li> Restart the brokers one by one for the new protocol version to take 
effect. </li>
+    <li> Once all consumers have been upgraded to 0.10.0, change 
log.message.format.version to 0.10.1 on each broker and restart them one by one.
+    </li>
+</ol>
+
+<p><b>Note:</b> If you are willing to accept downtime, you can simply take all 
the brokers down, update the code and start all of them. They will start with 
the new protocol by default.
+
+<p><b>Note:</b> Bumping the protocol version and restarting can be done any 
time after the brokers were upgraded. It does not have to be immediately after.
 
 <h5><a id="upgrade_10_1_breaking" href="#upgrade_10_1_breaking">Potential 
breaking changes in 0.10.1.0</a></h5>
 <ul>
@@ -30,6 +52,7 @@ However, please notice the <a 
href="#upgrade_10_1_breaking">Potential breaking c
     <li> The open file handlers of 0.10.0 will increase by ~33% because of the 
addition of time index files for each segment.</li>
     <li> The time index and offset index share the same index size 
configuration. Since each time index entry is 1.5x the size of offset index 
entry. User may need to increase log.index.size.max.bytes to avoid potential 
frequent log rolling. </li>
     <li> Due to the increased number of index files, on some brokers with 
large amount the log segments (e.g. >15K), the log loading process during the 
broker startup could be longer. Based on our experiment, setting the 
num.recovery.threads.per.data.dir to one may reduce the log loading time. </li>
+    <li> ListOffsetRequest v1 is introduced and used by default to support 
accurate offset search based on timestamp.
 </ul>
 
 <h5><a id="upgrade_1010_notable" href="#upgrade_1010_notable">Notable changes 
in 0.10.1.0</a></h5>
@@ -37,6 +60,7 @@ However, please notice the <a 
href="#upgrade_10_1_breaking">Potential breaking c
     <li> The new Java consumer is no longer in beta and we recommend it for 
all new development. The old Scala consumers are still supported, but they will 
be deprecated in the next release
          and will be removed in a future major release. </li>
     <li> The BrokerState "RunningAsController" (value 4) has been removed. Due 
to a bug, a broker would only be in this state briefly before transitioning out 
of it and hence the impact of the removal should be minimal. The recommended 
way to detect if a given broker is the controller is via the 
kafka.controller:type=KafkaController,name=ActiveControllerCount metric. </li>
+    <li> The new Java Consumer now allows users to search offsets by timestamp 
on partitions.
 </ul>
 
 <h4><a id="upgrade_10" href="#upgrade_10">Upgrading from 0.8.x or 0.9.x to 
0.10.0.0</a></h4>

Reply via email to