This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e26d0d7  MINOR: Revert incompatible behavior change to consumer reset 
tool (#4611)
e26d0d7 is described below

commit e26d0d760466c3da6189637a7de69eb5509e7b51
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Fri Feb 23 15:31:50 2018 -0800

    MINOR: Revert incompatible behavior change to consumer reset tool (#4611)
    
    This patch reverts the removal of the --execute option in the offset reset 
tool and the change to the default behavior when no options were present. For 
consistency, this patch adds the --execute flag to the streams reset tool, but 
keeps its current default behavior. A note has been added to both of these 
commands to warn the user that future default behavior will be to prompt before 
acting.
    
    Test cases were not actually validating that offsets were committed when 
the --execute option was present, so I have fixed that and added basic 
assertions for the dry-run behavior. I also removed some duplicated test 
boilerplate.
    
    Reviewers: Matthias J. Sax <mj...@apache.org>, Guozhang Wang 
<wangg...@gmail.com>
---
 .../scala/kafka/admin/ConsumerGroupCommand.scala   | 112 +++--
 .../main/scala/kafka/tools/StreamsResetter.java    |  56 ++-
 .../kafka/admin/ConsumerGroupCommandTest.scala     |  19 +
 .../kafka/admin/ResetConsumerGroupOffsetTest.scala | 530 +++++++--------------
 .../integration/AbstractResetIntegrationTest.java  |  17 +-
 .../kafka/streams/tools/StreamsResetterTest.java   |   3 -
 6 files changed, 309 insertions(+), 428 deletions(-)

diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala 
b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 4c2d6c7..6818631 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -114,16 +114,14 @@ object ConsumerGroupCommand extends Logging {
   }
 
   def printOffsetsToReset(groupAssignmentsToReset: Map[TopicPartition, 
OffsetAndMetadata]): Unit = {
-    print("\n%-30s %-10s %-15s".format("TOPIC", "PARTITION", "NEW-OFFSET"))
-    println()
+    println("\n%-30s %-10s %-15s".format("TOPIC", "PARTITION", "NEW-OFFSET"))
 
     groupAssignmentsToReset.foreach {
       case (consumerAssignment, offsetAndMetadata) =>
-        print("%-30s %-10s %-15s".format(
-          consumerAssignment.topic(),
-          consumerAssignment.partition(),
-          offsetAndMetadata.offset()))
-        println()
+        println("%-30s %-10s %-15s".format(
+          consumerAssignment.topic,
+          consumerAssignment.partition,
+          offsetAndMetadata.offset))
     }
   }
 
@@ -284,7 +282,7 @@ object ConsumerGroupCommand extends Logging {
     protected def opts: ConsumerGroupCommandOptions
 
     protected def getLogEndOffset(topicPartition: TopicPartition): 
LogOffsetResult =
-      
getLogEndOffsets(Seq(topicPartition)).get(topicPartition).getOrElse(LogOffsetResult.Ignore)
+      getLogEndOffsets(Seq(topicPartition)).getOrElse(topicPartition, 
LogOffsetResult.Ignore)
 
     protected def getLogEndOffsets(topicPartitions: Seq[TopicPartition]): 
Map[TopicPartition, LogOffsetResult]
 
@@ -550,46 +548,43 @@ object ConsumerGroupCommand extends Logging {
     // `consumer` is only needed for `describe`, so we instantiate it lazily
     private var consumer: KafkaConsumer[String, String] = _
 
-    def listGroups(): List[String] = {
+    override def listGroups(): List[String] = {
       adminClient.listAllConsumerGroupsFlattened().map(_.groupId)
     }
 
-    def collectGroupOffsets(): (Option[String], 
Option[Seq[PartitionAssignmentState]]) = {
+    override def collectGroupOffsets(): (Option[String], 
Option[Seq[PartitionAssignmentState]]) = {
       val group = opts.options.valueOf(opts.groupOpt)
       val consumerGroupSummary = adminClient.describeConsumerGroup(group, 
opts.options.valueOf(opts.timeoutMsOpt))
-      (Some(consumerGroupSummary.state),
-        consumerGroupSummary.consumers match {
-          case None =>
-            None
-          case Some(consumers) =>
-            var assignedTopicPartitions = Array[TopicPartition]()
-            val offsets = adminClient.listGroupOffsets(group)
-            val rowsWithConsumer =
-              if (offsets.isEmpty)
-                List[PartitionAssignmentState]()
-              else {
-                
consumers.filter(_.assignment.nonEmpty).sortWith(_.assignment.size > 
_.assignment.size).flatMap { consumerSummary =>
-                  val topicPartitions = consumerSummary.assignment
-                  assignedTopicPartitions = assignedTopicPartitions ++ 
consumerSummary.assignment
-                  val partitionOffsets: Map[TopicPartition, Option[Long]] = 
consumerSummary.assignment.map { topicPartition =>
-                    new TopicPartition(topicPartition.topic, 
topicPartition.partition) -> offsets.get(topicPartition)
-                  }.toMap
-                  collectConsumerAssignment(group, 
Some(consumerGroupSummary.coordinator), topicPartitions,
-                    partitionOffsets, Some(s"${consumerSummary.consumerId}"), 
Some(s"${consumerSummary.host}"),
-                    Some(s"${consumerSummary.clientId}"))
-                }
-              }
-
-            val rowsWithoutConsumer = 
offsets.filterKeys(!assignedTopicPartitions.contains(_)).flatMap {
-              case (topicPartition, offset) =>
-                collectConsumerAssignment(group, 
Some(consumerGroupSummary.coordinator), Seq(topicPartition),
-                    Map(topicPartition -> Some(offset)), 
Some(MISSING_COLUMN_VALUE),
-                    Some(MISSING_COLUMN_VALUE), Some(MISSING_COLUMN_VALUE))
+      val assignments = consumerGroupSummary.consumers.map { consumers =>
+        var assignedTopicPartitions = Array[TopicPartition]()
+        val offsets = adminClient.listGroupOffsets(group)
+        val rowsWithConsumer =
+          if (offsets.isEmpty)
+            List[PartitionAssignmentState]()
+          else {
+            consumers.filter(_.assignment.nonEmpty).sortWith(_.assignment.size 
> _.assignment.size).flatMap { consumerSummary =>
+              val topicPartitions = consumerSummary.assignment
+              assignedTopicPartitions = assignedTopicPartitions ++ 
consumerSummary.assignment
+              val partitionOffsets: Map[TopicPartition, Option[Long]] = 
consumerSummary.assignment.map { topicPartition =>
+                new TopicPartition(topicPartition.topic, 
topicPartition.partition) -> offsets.get(topicPartition)
+              }.toMap
+              collectConsumerAssignment(group, 
Some(consumerGroupSummary.coordinator), topicPartitions,
+                partitionOffsets, Some(s"${consumerSummary.consumerId}"), 
Some(s"${consumerSummary.host}"),
+                Some(s"${consumerSummary.clientId}"))
             }
+          }
 
-            Some(rowsWithConsumer ++ rowsWithoutConsumer)
+        val rowsWithoutConsumer = 
offsets.filterKeys(!assignedTopicPartitions.contains(_)).flatMap {
+          case (topicPartition, offset) =>
+            collectConsumerAssignment(group, 
Some(consumerGroupSummary.coordinator), Seq(topicPartition),
+              Map(topicPartition -> Some(offset)), Some(MISSING_COLUMN_VALUE),
+              Some(MISSING_COLUMN_VALUE), Some(MISSING_COLUMN_VALUE))
+        }
+
+        rowsWithConsumer ++ rowsWithoutConsumer
       }
-      )
+
+      (Some(consumerGroupSummary.state), assignments)
     }
 
     override def collectGroupMembers(verbose: Boolean): (Option[String], 
Option[Seq[MemberAssignmentState]]) = {
@@ -680,7 +675,9 @@ object ConsumerGroupCommand extends Logging {
         case "Empty" | "Dead" =>
           val partitionsToReset = getPartitionsToReset(groupId)
           val preparedOffsets = prepareOffsetsToReset(groupId, 
partitionsToReset)
-          val dryRun = opts.options.has(opts.dryRunOpt)
+
+          // Dry-run is the default behavior if --execute is not specified
+          val dryRun = opts.options.has(opts.dryRunOpt) || 
!opts.options.has(opts.executeOpt)
           if (!dryRun)
             getConsumer.commitSync(preparedOffsets.asJava)
           preparedOffsets
@@ -796,7 +793,7 @@ object ConsumerGroupCommand extends Logging {
         val (partitionsToResetWithCommittedOffset, 
partitionsToResetWithoutCommittedOffset) =
           
partitionsToReset.partition(currentCommittedOffsets.keySet.contains(_))
 
-        val preparedOffsetsForParititionsWithCommittedOffset = 
partitionsToResetWithCommittedOffset.map { topicPartition =>
+        val preparedOffsetsForPartitionsWithCommittedOffset = 
partitionsToResetWithCommittedOffset.map { topicPartition =>
           (topicPartition, new 
OffsetAndMetadata(currentCommittedOffsets.get(topicPartition) match {
             case Some(offset) => offset
             case _ => throw new IllegalStateException(s"Expected a valid 
current offset for topic partition: $topicPartition")
@@ -808,7 +805,7 @@ object ConsumerGroupCommand extends Logging {
           case (topicPartition, _) => 
CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting ending offset of 
topic partition: $topicPartition")
         }
 
-        preparedOffsetsForParititionsWithCommittedOffset ++ 
preparedOffsetsForPartitionsWithoutCommittedOffset
+        preparedOffsetsForPartitionsWithCommittedOffset ++ 
preparedOffsetsForPartitionsWithoutCommittedOffset
       } else {
         CommandLineUtils.printUsageAndDie(opts.parser, "Option '%s' requires 
one of the following scenarios: %s".format(opts.resetOffsetsOpt, 
opts.allResetOffsetScenarioOpts) )
       }
@@ -838,7 +835,7 @@ object ConsumerGroupCommand extends Logging {
     }
 
     override def exportOffsetsToReset(assignmentsToReset: Map[TopicPartition, 
OffsetAndMetadata]): String = {
-      val rows = assignmentsToReset.map { case (k,v) => 
s"${k.topic()},${k.partition()},${v.offset()}" }(collection.breakOut): 
List[String]
+      val rows = assignmentsToReset.map { case (k,v) => 
s"${k.topic},${k.partition},${v.offset}" }(collection.breakOut): List[String]
       rows.foldRight("")(_ + "\n" + _)
     }
 
@@ -899,10 +896,13 @@ object ConsumerGroupCommand extends Logging {
       "or is going through some changes)."
     val CommandConfigDoc = "Property file containing configs to be passed to 
Admin Client and Consumer."
     val ResetOffsetsDoc = "Reset offsets of consumer group. Supports one 
consumer group at the time, and instances should be inactive" + nl +
-      "Has 3 execution options: (default) to plan which offsets to reset, 
--execute to execute the reset-offsets process, and --export to export the 
results to a CSV format." + nl +
-      "Has the following scenarios to choose: --to-datetime, --by-period, 
--to-earliest, --to-latest, --shift-by, --from-file, --to-current. One scenario 
must be choose" + nl +
-      "To define the scope use: --all-topics or --topic. . One scope must be 
choose, unless you use '--from-file' scenario"
+      "Has 2 execution options: --dry-run (the default) to plan which offsets 
to reset, and --execute to update the offsets. " +
+      "Additionally, the --export option is used to export the results to a 
CSV format." + nl +
+      "You must choose one of the following reset specifications: 
--to-datetime, --by-period, --to-earliest, " +
+      "--to-latest, --shift-by, --from-file, --to-current." + nl +
+      "To define the scope use --all-topics or --topic. One scope must be 
specified unless you use '--from-file'."
     val DryRunDoc = "Only show results without executing changes on Consumer 
Groups. Supported operations: reset-offsets."
+    val ExecuteDoc = "Execute operation. Supported operations: reset-offsets."
     val ExportDoc = "Export operation execution to a CSV file. Supported 
operations: reset-offsets."
     val ResetToOffsetDoc = "Reset offsets to a specific offset."
     val ResetFromFileDoc = "Reset offsets to values defined in CSV file."
@@ -955,6 +955,7 @@ object ConsumerGroupCommand extends Logging {
                                   .ofType(classOf[String])
     val resetOffsetsOpt = parser.accepts("reset-offsets", ResetOffsetsDoc)
     val dryRunOpt = parser.accepts("dry-run", DryRunDoc)
+    val executeOpt = parser.accepts("execute", ExecuteDoc)
     val exportOpt = parser.accepts("export", ExportDoc)
     val resetToOffsetOpt = parser.accepts("to-offset", ResetToOffsetDoc)
                            .withRequiredArg()
@@ -1029,8 +1030,19 @@ object ConsumerGroupCommand extends Logging {
         CommandLineUtils.checkRequiredArgs(parser, options, groupOpt)
 
       if (options.has(deleteOpt) && !options.has(groupOpt) && 
!options.has(topicOpt))
-        CommandLineUtils.printUsageAndDie(parser, "Option %s either takes %s, 
%s, or both".format(deleteOpt, groupOpt, topicOpt))
-      if (options.has(resetOffsetsOpt))
+        CommandLineUtils.printUsageAndDie(parser, s"Option $deleteOpt either 
takes $groupOpt, $topicOpt, or both")
+
+      if (options.has(resetOffsetsOpt)) {
+        if (!options.has(dryRunOpt) && !options.has(executeOpt)) {
+          Console.err.println("WARN: In a future major release, the default 
behavior of this command will be to " +
+            "prompt the user before executing the reset rather than doing a 
dry run. You should add the --dry-run " +
+            "option explicitly if you are scripting this command and want to 
keep the current default behavior " +
+            "without prompting.")
+        }
+
+        if (options.has(dryRunOpt) && options.has(executeOpt))
+          CommandLineUtils.printUsageAndDie(parser, s"Option $resetOffsetsOpt 
only accepts one of $executeOpt and $dryRunOpt")
+
         CommandLineUtils.checkRequiredArgs(parser, options, groupOpt)
         CommandLineUtils.checkInvalidArgs(parser, options, resetToOffsetOpt, 
allResetOffsetScenarioOpts - resetToOffsetOpt)
         CommandLineUtils.checkInvalidArgs(parser, options, resetToDatetimeOpt, 
allResetOffsetScenarioOpts - resetToDatetimeOpt)
@@ -1040,7 +1052,7 @@ object ConsumerGroupCommand extends Logging {
         CommandLineUtils.checkInvalidArgs(parser, options, resetToCurrentOpt, 
allResetOffsetScenarioOpts - resetToCurrentOpt)
         CommandLineUtils.checkInvalidArgs(parser, options, resetShiftByOpt, 
allResetOffsetScenarioOpts - resetShiftByOpt)
         CommandLineUtils.checkInvalidArgs(parser, options, resetFromFileOpt, 
allResetOffsetScenarioOpts - resetFromFileOpt)
-
+      }
 
       // check invalid args
       CommandLineUtils.checkInvalidArgs(parser, options, groupOpt, 
allConsumerGroupLevelOpts - describeOpt - deleteOpt - resetOffsetsOpt)
diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java 
b/core/src/main/scala/kafka/tools/StreamsResetter.java
index 4496876..31c69ee 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -91,6 +91,7 @@ public class StreamsResetter {
     private static OptionSpec<String> fromFileOption;
     private static OptionSpec<Long> shiftByOption;
     private static OptionSpecBuilder dryRunOption;
+    private static OptionSpecBuilder executeOption;
     private static OptionSpec<String> commandConfigOption;
 
     private OptionSet options = null;
@@ -109,6 +110,7 @@ public class StreamsResetter {
 
         try {
             parseArguments(args);
+
             final boolean dryRun = options.has(dryRunOption);
 
             final String groupId = options.valueOf(applicationIdOption);
@@ -207,6 +209,7 @@ public class StreamsResetter {
             .withRequiredArg()
             .ofType(String.class)
             .describedAs("file name");
+        executeOption = optionParser.accepts("execute", "Execute the 
command.");
         dryRunOption = optionParser.accepts("dry-run", "Display the actions 
that would be performed without executing the reset commands.");
 
         // TODO: deprecated in 1.0; can be removed eventually
@@ -219,6 +222,16 @@ public class StreamsResetter {
             throw e;
         }
 
+        if (options.has(executeOption) && options.has(dryRunOption)) {
+            CommandLineUtils.printUsageAndDie(optionParser, "Only one of 
--dry-run and --execute can be specified");
+        }
+
+        if (!options.has(executeOption) && !options.has(dryRunOption)) {
+            System.err.println("WARN: In a future major release, the default 
behavior of this command will be to " +
+                    "prompt the user before executing the reset. You should 
add the --execute option explicitly if " +
+                    "you are scripting this command and want to keep the 
current default behavior without prompting.");
+        }
+
         scala.collection.immutable.HashSet<OptionSpec<?>> allScenarioOptions = 
new scala.collection.immutable.HashSet<>();
         allScenarioOptions.$plus(toOffsetOption);
         allScenarioOptions.$plus(toDatetimeOption);
@@ -266,7 +279,6 @@ public class StreamsResetter {
                 notFoundInputTopics.add(topic);
             } else {
                 topicsToSubscribe.add(topic);
-
             }
         }
         for (final String topic : intermediateTopics) {
@@ -277,6 +289,28 @@ public class StreamsResetter {
             }
         }
 
+        if (!notFoundInputTopics.isEmpty()) {
+            System.out.println("Following input topics are not found, skipping 
them");
+            for (final String topic : notFoundInputTopics) {
+                System.out.println("Topic: " + topic);
+            }
+            topicNotFound = EXIT_CODE_ERROR;
+        }
+
+        if (!notFoundIntermediateTopics.isEmpty()) {
+            System.out.println("Following intermediate topics are not found, 
skipping them");
+            for (final String topic : notFoundIntermediateTopics) {
+                System.out.println("Topic:" + topic);
+            }
+            topicNotFound = EXIT_CODE_ERROR;
+        }
+
+        // Return early if there are no topics to reset (the consumer will 
raise an error if we
+        // try to poll with an empty subscription)
+        if (topicsToSubscribe.isEmpty()) {
+            return topicNotFound;
+        }
+
         final Properties config = new Properties();
         config.putAll(consumerConfig);
         config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
@@ -311,22 +345,6 @@ public class StreamsResetter {
                 }
                 client.commitSync();
             }
-
-            if (notFoundInputTopics.size() > 0) {
-                System.out.println("Following input topics are not found, 
skipping them");
-                for (final String topic : notFoundInputTopics) {
-                    System.out.println("Topic: " + topic);
-                }
-                topicNotFound = EXIT_CODE_ERROR;
-            }
-
-            if (notFoundIntermediateTopics.size() > 0) {
-                System.out.println("Following intermediate topics are not 
found, skipping them");
-                for (final String topic : notFoundIntermediateTopics) {
-                    System.out.println("Topic:" + topic);
-                }
-            }
-
         } catch (final Exception e) {
             System.err.println("ERROR: Resetting offsets failed.");
             throw e;
@@ -337,8 +355,8 @@ public class StreamsResetter {
 
     // visible for testing
     public void maybeSeekToEnd(final String groupId,
-                                final Consumer<byte[], byte[]> client,
-                                final Set<TopicPartition> 
intermediateTopicPartitions) {
+                               final Consumer<byte[], byte[]> client,
+                               final Set<TopicPartition> 
intermediateTopicPartitions) {
         if (intermediateTopicPartitions.size() > 0) {
             System.out.println("Following intermediate topics offsets will be 
reset to end (for consumer group " + groupId + ")");
             for (final TopicPartition topicPartition : 
intermediateTopicPartitions) {
diff --git 
a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala 
b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
index 4ca04b0..7c45109 100644
--- a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
@@ -74,6 +74,25 @@ class ConsumerGroupCommandTest extends 
KafkaServerTestHarness {
     oldConsumers += new OldConsumer(Whitelist(topic), consumerProps)
   }
 
+  def committedOffsets(topic: String = topic, group: String = group): 
Map[TopicPartition, Long] = {
+    val props = new Properties
+    props.put("bootstrap.servers", brokerList)
+    props.put("group.id", group)
+    val consumer = new KafkaConsumer(props, new StringDeserializer, new 
StringDeserializer)
+    try {
+      consumer.partitionsFor(topic).asScala.flatMap { partitionInfo =>
+        val tp = new TopicPartition(partitionInfo.topic, 
partitionInfo.partition)
+        val committed = consumer.committed(tp)
+        if (committed == null)
+          None
+        else
+          Some(tp -> committed.offset)
+      }.toMap
+    } finally {
+      consumer.close()
+    }
+  }
+
   def stopRandomOldConsumer(): Unit = {
     oldConsumers.head.stop()
   }
diff --git 
a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala 
b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
index befe65c..3d8e895 100644
--- a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
@@ -16,9 +16,10 @@ import java.io.{BufferedWriter, File, FileWriter}
 import java.text.{ParseException, SimpleDateFormat}
 import java.util.{Calendar, Date, Properties}
 
-import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, 
ConsumerGroupService, KafkaConsumerGroupService}
+import kafka.admin.ConsumerGroupCommand.ConsumerGroupService
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
+import org.apache.kafka.common.TopicPartition
 import org.junit.Assert._
 import org.junit.Test
 
@@ -73,10 +74,6 @@ class ResetConsumerGroupOffsetTest extends 
ConsumerGroupCommandTest {
   val topic1 = "foo1"
   val topic2 = "foo2"
 
-  /**
-    * Implementations must override this method to return a set of 
KafkaConfigs. This method will be invoked for every
-    * test and should not reuse previous configurations unless they select 
their ports randomly when servers are started.
-    */
   override def generateConfigs: Seq[KafkaConfig] = {
     TestUtils.createBrokerConfigs(1, zkConnect, enableControlledShutdown = 
false)
       .map(KafkaConfig.fromProps(_, overridingProps))  
@@ -84,469 +81,300 @@ class ResetConsumerGroupOffsetTest extends 
ConsumerGroupCommandTest {
 
   @Test
   def testResetOffsetsNotExistingGroup() {
-    addConsumerGroupExecutor(numConsumers = 1, topic1)
-
-    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", 
"--group", "missing.group", "--all-topics", "--to-current")
-    val consumerGroupCommand = getConsumerGroupService(cgcArgs)
-
-    TestUtils.waitUntilTrue(() => {
-      val assignmentsToReset = consumerGroupCommand.resetOffsets()
-      assignmentsToReset == Map.empty
-    }, "Expected to have an empty assignations map.")
-
+    val args = Array("--bootstrap-server", brokerList, "--reset-offsets", 
"--group", "missing.group", "--all-topics",
+      "--to-current", "--execute")
+    val consumerGroupCommand = getConsumerGroupService(args)
+    val resetOffsets = consumerGroupCommand.resetOffsets()
+    assertEquals(Map.empty, resetOffsets)
+    assertEquals(resetOffsets, committedOffsets(group = "missing.group"))
   }
 
   @Test
   def testResetOffsetsNewConsumerExistingTopic(): Unit = {
-    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", 
"--group", "new.group", "--topic", topic1, "--to-offset", "50")
-    val opts = new ConsumerGroupCommandOptions(cgcArgs)
-    val consumerGroupCommand = new KafkaConsumerGroupService(opts)
-
-    adminZkClient.createTopic(topic1, 1, 1)
-
-    TestUtils.produceMessages(servers, topic1, 100, acks = 1, 100 * 1000)
-
-    TestUtils.waitUntilTrue(() => {
-      val assignmentsToReset = consumerGroupCommand.resetOffsets()
-      assignmentsToReset.exists({ assignment => assignment._2.offset() == 50 })
-    }, "Expected the consumer group to reset to offset 1 (specific offset).")
-
-    printConsumerGroup("new.group")
-    adminZkClient.deleteTopic(topic1)
-    consumerGroupCommand.close()
+    val args = Array("--bootstrap-server", brokerList, "--reset-offsets", 
"--group", "new.group", "--topic", topic,
+      "--to-offset", "50")
+    TestUtils.produceMessages(servers, topic, 100, acks = 1, 100 * 1000)
+    resetAndAssertOffsets(args, expectedOffset = 50, dryRun = true)
+    resetAndAssertOffsets(args ++ Array("--dry-run"), expectedOffset = 50, 
dryRun = true)
+    resetAndAssertOffsets(args ++ Array("--execute"), expectedOffset = 50, 
group = "new.group")
   }
 
   @Test
   def testResetOffsetsToLocalDateTime() {
-    adminZkClient.createTopic(topic1, 1, 1)
-
     val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS")
     val calendar = Calendar.getInstance()
     calendar.add(Calendar.DATE, -1)
 
-    TestUtils.produceMessages(servers, topic1, 100, acks = 1, 100 * 1000)
-
-    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", 
"--group", group, "--all-topics", "--dry-run")
-    val consumerGroupCommand = getConsumerGroupService(cgcArgs)
-
-    val executor = addConsumerGroupExecutor(numConsumers = 1, topic1)
-
-    TestUtils.waitUntilTrue(() => {
-      val (_, assignmentsOption) = consumerGroupCommand.collectGroupOffsets()
-      assignmentsOption match {
-        case Some(assignments) =>
-          val sumOffset = assignments.filter(_.topic.exists(_ == topic1))
-            .filter(_.offset.isDefined)
-            .map(assignment => assignment.offset.get)
-            .foldLeft(0.toLong)(_ + _)
-          sumOffset == 100
-        case _ => false
-      }
-    }, "Expected that consumer group has consumed all messages from 
topic/partition.")
+    TestUtils.produceMessages(servers, topic, 100, acks = 1, 100 * 1000)
 
+    val executor = addConsumerGroupExecutor(numConsumers = 1, topic)
+    awaitConsumerProgress(count = 100L)
     executor.shutdown()
 
-    val cgcArgs1 = Array("--bootstrap-server", brokerList, "--reset-offsets", 
"--group", group, "--all-topics", "--to-datetime", 
format.format(calendar.getTime))
-    val consumerGroupCommand1 = getConsumerGroupService(cgcArgs1)
-
-    TestUtils.waitUntilTrue(() => {
-      val assignmentsToReset = consumerGroupCommand1.resetOffsets()
-      assignmentsToReset.exists { assignment => assignment._2.offset() == 0 }
-    }, "Expected the consumer group to reset to when offset was 50.")
-
-    printConsumerGroup()
-
-    adminZkClient.deleteTopic(topic1)
+    val args = Array("--bootstrap-server", brokerList, "--reset-offsets", 
"--group", group, "--all-topics",
+      "--to-datetime", format.format(calendar.getTime), "--execute")
+    resetAndAssertOffsets(args, expectedOffset = 0)
   }
 
   @Test
   def testResetOffsetsToZonedDateTime() {
-    adminZkClient.createTopic(topic1, 1, 1)
-    TestUtils.produceMessages(servers, topic1, 50, acks = 1, 100 * 1000)
-
     val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX")
-    val checkpoint = new Date()
-
-    TestUtils.produceMessages(servers, topic1, 50, acks = 1, 100 * 1000)
 
-    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", 
"--group", group, "--all-topics", "--dry-run")
-    val consumerGroupCommand = getConsumerGroupService(cgcArgs)
-
-    val executor = addConsumerGroupExecutor(numConsumers = 1, topic1)
-
-    TestUtils.waitUntilTrue(() => {
-      val (_, assignmentsOption) = consumerGroupCommand.collectGroupOffsets()
-      assignmentsOption match {
-        case Some(assignments) =>
-          val sumOffset = (assignments.filter(_.topic.exists(_ == topic1))
-            .filter(_.offset.isDefined)
-            .map(assignment => assignment.offset.get) foldLeft 0.toLong)(_ + _)
-          sumOffset == 100
-        case _ => false
-      }
-    }, "Expected that consumer group has consumed all messages from 
topic/partition.")
+    TestUtils.produceMessages(servers, topic, 50, acks = 1, 100 * 1000)
+    val checkpoint = new Date()
+    TestUtils.produceMessages(servers, topic, 50, acks = 1, 100 * 1000)
 
+    val executor = addConsumerGroupExecutor(numConsumers = 1, topic)
+    awaitConsumerProgress(count = 100L)
     executor.shutdown()
 
-    val cgcArgs1 = Array("--bootstrap-server", brokerList, "--reset-offsets", 
"--group", group, "--all-topics", "--to-datetime", format.format(checkpoint))
-    val consumerGroupCommand1 = getConsumerGroupService(cgcArgs1)
-
-    TestUtils.waitUntilTrue(() => {
-      val assignmentsToReset = consumerGroupCommand1.resetOffsets()
-      assignmentsToReset.exists { assignment => assignment._2.offset() == 50 }
-    }, "Expected the consumer group to reset to when offset was 50.")
-
-    printConsumerGroup()
-
-    adminZkClient.deleteTopic(topic1)
+    val args = Array("--bootstrap-server", brokerList, "--reset-offsets", 
"--group", group, "--all-topics",
+      "--to-datetime", format.format(checkpoint), "--execute")
+    resetAndAssertOffsets(args, expectedOffset = 50)
   }
 
   @Test
   def testResetOffsetsByDuration() {
-    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", 
"--group", group, "--all-topics", "--by-duration", "PT1M")
-    val consumerGroupCommand = getConsumerGroupService(cgcArgs)
-
-    adminZkClient.createTopic(topic1, 1, 1)
-
-    produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
-
-    TestUtils.waitUntilTrue(() => {
-        val assignmentsToReset = consumerGroupCommand.resetOffsets()
-        assignmentsToReset.exists { assignment => assignment._2.offset() == 0 }
-    }, "Expected the consumer group to reset to offset 0 (earliest by 
duration).")
-
-    printConsumerGroup()
-
-    adminZkClient.deleteTopic(topic1)
+    val args = Array("--bootstrap-server", brokerList, "--reset-offsets", 
"--group", group, "--all-topics",
+      "--by-duration", "PT1M", "--execute")
+    produceConsumeAndShutdown(totalMessages = 100)
+    resetAndAssertOffsets(args, expectedOffset = 0)
   }
 
   @Test
   def testResetOffsetsByDurationToEarliest() {
-    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", 
"--group", group, "--all-topics", "--by-duration", "PT0.1S")
-    val consumerGroupCommand = getConsumerGroupService(cgcArgs)
-
-    adminZkClient.createTopic(topic1, 1, 1)
-
-    produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
-
-    TestUtils.waitUntilTrue(() => {
-      val assignmentsToReset = consumerGroupCommand.resetOffsets()
-      assignmentsToReset.exists { assignment => assignment._2.offset() == 100 }
-    }, "Expected the consumer group to reset to offset 100 (latest by 
duration).")
-
-    printConsumerGroup()
-    adminZkClient.deleteTopic(topic1)
+    val args = Array("--bootstrap-server", brokerList, "--reset-offsets", 
"--group", group, "--all-topics",
+      "--by-duration", "PT0.1S", "--execute")
+    produceConsumeAndShutdown(totalMessages = 100)
+    resetAndAssertOffsets(args, expectedOffset = 100)
   }
 
   @Test
   def testResetOffsetsToEarliest() {
-    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", 
"--group", group, "--all-topics", "--to-earliest")
-    val consumerGroupCommand = getConsumerGroupService(cgcArgs)
-
-    adminZkClient.createTopic(topic1, 1, 1)
-
-    produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
-
-    TestUtils.waitUntilTrue(() => {
-      val assignmentsToReset = consumerGroupCommand.resetOffsets()
-      assignmentsToReset.exists { assignment => assignment._2.offset() == 0 }
-    }, "Expected the consumer group to reset to offset 0 (earliest).")
-
-    printConsumerGroup()
-    adminZkClient.deleteTopic(topic1)
+    val args = Array("--bootstrap-server", brokerList, "--reset-offsets", 
"--group", group, "--all-topics",
+      "--to-earliest", "--execute")
+    produceConsumeAndShutdown(totalMessages = 100)
+    resetAndAssertOffsets(args, expectedOffset = 0)
   }
 
   @Test
   def testResetOffsetsToLatest() {
-    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", 
"--group", group, "--all-topics", "--to-latest")
-    val consumerGroupCommand = getConsumerGroupService(cgcArgs)
-
-    adminZkClient.createTopic(topic1, 1, 1)
-
-    produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
-
-    TestUtils.produceMessages(servers, topic1, 100, acks = 1, 100 * 1000)
-
-
-    TestUtils.waitUntilTrue(() => {
-      val assignmentsToReset = consumerGroupCommand.resetOffsets()
-      assignmentsToReset.exists({ assignment => assignment._2.offset() == 200 
})
-    }, "Expected the consumer group to reset to offset 200 (latest).")
-
-    printConsumerGroup()
-    adminZkClient.deleteTopic(topic1)
+    val args = Array("--bootstrap-server", brokerList, "--reset-offsets", 
"--group", group, "--all-topics",
+      "--to-latest", "--execute")
+    produceConsumeAndShutdown(totalMessages = 100)
+    TestUtils.produceMessages(servers, topic, 100, acks = 1, 100 * 1000)
+    resetAndAssertOffsets(args, expectedOffset = 200)
   }
 
   @Test
   def testResetOffsetsToCurrentOffset() {
-    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", 
"--group", group, "--all-topics", "--to-current")
-    val consumerGroupCommand = getConsumerGroupService(cgcArgs)
-
-    adminZkClient.createTopic(topic1, 1, 1)
-
-    produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
-
-    TestUtils.produceMessages(servers, topic1, 100, acks = 1, 100 * 1000)
-
-    TestUtils.waitUntilTrue(() => {
-      val assignmentsToReset = consumerGroupCommand.resetOffsets()
-      assignmentsToReset.exists({ assignment => assignment._2.offset() == 100 
})
-    }, "Expected the consumer group to reset to offset 100 (current).")
-
-    printConsumerGroup()
-    adminZkClient.deleteTopic(topic1)
-  }
-
-  private def produceConsumeAndShutdown(consumerGroupCommand: 
ConsumerGroupService, numConsumers: Int, topic: String, totalMessages: Int) {
-    TestUtils.produceMessages(servers, topic, totalMessages, acks = 1, 100 * 
1000)
-    val executor =  addConsumerGroupExecutor(numConsumers, topic)
-
-    TestUtils.waitUntilTrue(() => {
-      val (_, assignmentsOption) = consumerGroupCommand.collectGroupOffsets()
-      assignmentsOption match {
-        case Some(assignments) =>
-          val sumOffset = assignments.filter(_.topic.exists(_ == topic))
-            .filter(_.offset.isDefined)
-            .map(assignment => assignment.offset.get)
-            .foldLeft(0.toLong)(_ + _)
-          sumOffset == totalMessages
-        case _ => false
-      }
-    }, "Expected the consumer group to consume all messages from topic.")
-
-    executor.shutdown()
+    val args = Array("--bootstrap-server", brokerList, "--reset-offsets", 
"--group", group, "--all-topics",
+      "--to-current", "--execute")
+    produceConsumeAndShutdown(totalMessages = 100)
+    TestUtils.produceMessages(servers, topic, 100, acks = 1, 100 * 1000)
+    resetAndAssertOffsets(args, expectedOffset = 100)
   }
 
   @Test
   def testResetOffsetsToSpecificOffset() {
-    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", 
"--group", group, "--all-topics", "--to-offset", "1")
-    val consumerGroupCommand = getConsumerGroupService(cgcArgs)
-
-    adminZkClient.createTopic(topic1, 1, 1)
-
-    produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
-
-
-    TestUtils.waitUntilTrue(() => {
-      val assignmentsToReset = consumerGroupCommand.resetOffsets()
-      assignmentsToReset.exists({ assignment => assignment._2.offset() == 1 })
-    }, "Expected the consumer group to reset to offset 1 (specific offset).")
-
-    printConsumerGroup()
-    adminZkClient.deleteTopic(topic1)
+    val args = Array("--bootstrap-server", brokerList, "--reset-offsets", 
"--group", group, "--all-topics",
+      "--to-offset", "1", "--execute")
+    produceConsumeAndShutdown(totalMessages = 100)
+    resetAndAssertOffsets(args, expectedOffset = 1)
   }
 
   @Test
   def testResetOffsetsShiftPlus() {
-    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", 
"--group", group, "--all-topics", "--shift-by", "50")
-    val consumerGroupCommand = getConsumerGroupService(cgcArgs)
-
-    adminZkClient.createTopic(topic1, 1, 1)
-
-    produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
-
-    TestUtils.produceMessages(servers, topic1, 100, acks = 1, 100 * 1000)
-
-    TestUtils.waitUntilTrue(() => {
-      val assignmentsToReset = consumerGroupCommand.resetOffsets()
-      assignmentsToReset.exists({ assignment => assignment._2.offset() == 150 
})
-    }, "Expected the consumer group to reset to offset 150 (current + 50).")
-
-    printConsumerGroup()
-    adminZkClient.deleteTopic(topic1)
+    val args = Array("--bootstrap-server", brokerList, "--reset-offsets", 
"--group", group, "--all-topics",
+      "--shift-by", "50", "--execute")
+    produceConsumeAndShutdown(totalMessages = 100)
+    TestUtils.produceMessages(servers, topic, 100, acks = 1, 100 * 1000)
+    resetAndAssertOffsets(args, expectedOffset = 150)
   }
 
   @Test
   def testResetOffsetsShiftMinus() {
-    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", 
"--group", group, "--all-topics", "--shift-by", "-50")
-    val consumerGroupCommand = getConsumerGroupService(cgcArgs)
-
-    adminZkClient.createTopic(topic1, 1, 1)
-
-    produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
-
-    TestUtils.produceMessages(servers, topic1, 100, acks = 1, 100 * 1000)
-
-
-    TestUtils.waitUntilTrue(() => {
-      val assignmentsToReset = consumerGroupCommand.resetOffsets()
-      assignmentsToReset.exists({ assignment => assignment._2.offset() == 50 })
-    }, "Expected the consumer group to reset to offset 50 (current - 50).")
-
-    printConsumerGroup()
-    adminZkClient.deleteTopic(topic1)
+    val args = Array("--bootstrap-server", brokerList, "--reset-offsets", 
"--group", group, "--all-topics",
+      "--shift-by", "-50", "--execute")
+    produceConsumeAndShutdown(totalMessages = 100)
+    TestUtils.produceMessages(servers, topic, 100, acks = 1, 100 * 1000)
+    resetAndAssertOffsets(args, expectedOffset = 50)
   }
 
   @Test
   def testResetOffsetsShiftByLowerThanEarliest() {
-    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", 
"--group", group, "--all-topics", "--shift-by", "-150")
-    val consumerGroupCommand = getConsumerGroupService(cgcArgs)
-
-    adminZkClient.createTopic(topic1, 1, 1)
-
-    produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
-
-    TestUtils.produceMessages(servers, topic1, 100, acks = 1, 100 * 1000)
-
-    TestUtils.waitUntilTrue(() => {
-      val assignmentsToReset = consumerGroupCommand.resetOffsets()
-      assignmentsToReset.exists({ assignment => assignment._2.offset() == 0 })
-    }, "Expected the consumer group to reset to offset 0 (earliest by shift).")
-
-    printConsumerGroup()
-    adminZkClient.deleteTopic(topic1)
+    val args = Array("--bootstrap-server", brokerList, "--reset-offsets", 
"--group", group, "--all-topics",
+      "--shift-by", "-150", "--execute")
+    produceConsumeAndShutdown(totalMessages = 100)
+    TestUtils.produceMessages(servers, topic, 100, acks = 1, 100 * 1000)
+    resetAndAssertOffsets(args, expectedOffset = 0)
   }
 
   @Test
   def testResetOffsetsShiftByHigherThanLatest() {
-    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", 
"--group", group, "--all-topics", "--shift-by", "150")
-    val consumerGroupCommand = getConsumerGroupService(cgcArgs)
-
-    adminZkClient.createTopic(topic1, 1, 1)
-
-    produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
-
-    TestUtils.produceMessages(servers, topic1, 100, acks = 1, 100 * 1000)
-
-    TestUtils.waitUntilTrue(() => {
-      val assignmentsToReset = consumerGroupCommand.resetOffsets()
-      assignmentsToReset.exists({ assignment => assignment._2.offset() == 200 
})
-    }, "Expected the consumer group to reset to offset 200 (latest by shift).")
-
-    printConsumerGroup()
-    adminZkClient.deleteTopic(topic1)
+    val args = Array("--bootstrap-server", brokerList, "--reset-offsets", 
"--group", group, "--all-topics",
+      "--shift-by", "150", "--execute")
+    produceConsumeAndShutdown(totalMessages = 100)
+    TestUtils.produceMessages(servers, topic, 100, acks = 1, 100 * 1000)
+    resetAndAssertOffsets(args, expectedOffset = 200)
   }
 
   @Test
   def testResetOffsetsToEarliestOnOneTopic() {
-    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", 
"--group", group, "--topic", topic1, "--to-earliest")
-    val consumerGroupCommand = getConsumerGroupService(cgcArgs)
-
-    adminZkClient.createTopic(topic1, 1, 1)
-
-    produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
-
-    TestUtils.waitUntilTrue(() => {
-      val assignmentsToReset = consumerGroupCommand.resetOffsets()
-      assignmentsToReset.exists { assignment => assignment._2.offset() == 0 }
-    }, "Expected the consumer group to reset to offset 0 (earliest).")
-
-    printConsumerGroup()
-    adminZkClient.deleteTopic(topic1)
+    val args = Array("--bootstrap-server", brokerList, "--reset-offsets", 
"--group", group, "--topic", topic,
+      "--to-earliest", "--execute")
+    produceConsumeAndShutdown(totalMessages = 100)
+    resetAndAssertOffsets(args, expectedOffset = 0)
   }
 
   @Test
   def testResetOffsetsToEarliestOnOneTopicAndPartition() {
-    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", 
"--group", group, "--topic", String.format("%s:1", topic1), "--to-earliest")
-    val consumerGroupCommand = getConsumerGroupService(cgcArgs)
+    val topic = "bar"
+    adminZkClient.createTopic(topic, 2, 1)
 
-    adminZkClient.createTopic(topic1, 2, 1)
+    val args = Array("--bootstrap-server", brokerList, "--reset-offsets", 
"--group", group, "--topic",
+      s"$topic:1", "--to-earliest", "--execute")
+    val consumerGroupCommand = getConsumerGroupService(args)
 
-    produceConsumeAndShutdown(consumerGroupCommand, 2, topic1, 100)
+    produceConsumeAndShutdown(totalMessages = 100, numConsumers = 2, topic)
+    val priorCommittedOffsets = committedOffsets(topic = topic)
 
-    TestUtils.waitUntilTrue(() => {
-      val assignmentsToReset = consumerGroupCommand.resetOffsets()
-      assignmentsToReset.exists { assignment => assignment._2.offset() == 0 && 
assignment._1.partition() == 1 }
-    }, "Expected the consumer group to reset to offset 0 (earliest) in 
partition 1.")
+    val tp0 = new TopicPartition(topic, 0)
+    val tp1 = new TopicPartition(topic, 1)
+    val expectedOffsets = Map(tp0 -> priorCommittedOffsets(tp0), tp1 -> 0L)
+    resetAndAssertOffsetsCommitted(consumerGroupCommand, expectedOffsets, 
topic)
 
-    printConsumerGroup()
-    adminZkClient.deleteTopic(topic1)
+    adminZkClient.deleteTopic(topic)
   }
 
   @Test
   def testResetOffsetsToEarliestOnTopics() {
-    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets",
-      "--group", group,
-      "--topic", topic1,
-      "--topic", topic2,
-      "--to-earliest")
-    val consumerGroupCommand = getConsumerGroupService(cgcArgs)
-
+    val topic1 = "topic1"
+    val topic2 = "topic2"
     adminZkClient.createTopic(topic1, 1, 1)
     adminZkClient.createTopic(topic2, 1, 1)
 
-    produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
-    produceConsumeAndShutdown(consumerGroupCommand, 1, topic2, 100)
+    val args = Array("--bootstrap-server", brokerList, "--reset-offsets", 
"--group", group, "--topic", topic1,
+      "--topic", topic2, "--to-earliest", "--execute")
+    val consumerGroupCommand = getConsumerGroupService(args)
 
-    TestUtils.waitUntilTrue(() => {
-      val assignmentsToReset = consumerGroupCommand.resetOffsets()
-      assignmentsToReset.exists { assignment => assignment._2.offset() == 0 && 
assignment._1.topic() == topic1 } &&
-        assignmentsToReset.exists { assignment => assignment._2.offset() == 0 
&& assignment._1.topic() == topic2 }
-    }, "Expected the consumer group to reset to offset 0 (earliest).")
+    produceConsumeAndShutdown(100, 1, topic1)
+    produceConsumeAndShutdown(100, 1, topic2)
+
+    val tp1 = new TopicPartition(topic1, 0)
+    val tp2 = new TopicPartition(topic2, 0)
+
+    val allResetOffsets = resetOffsets(consumerGroupCommand)
+    assertEquals(Map(tp1 -> 0L, tp2 -> 0L), allResetOffsets)
+    assertEquals(Map(tp1 -> 0L), committedOffsets(topic1))
+    assertEquals(Map(tp2 -> 0L), committedOffsets(topic2))
 
-    printConsumerGroup()
     adminZkClient.deleteTopic(topic1)
     adminZkClient.deleteTopic(topic2)
   }
 
   @Test
   def testResetOffsetsToEarliestOnTopicsAndPartitions() {
-    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets",
-      "--group", group,
-      "--topic", String.format("%s:1", topic1),
-      "--topic", String.format("%s:1", topic2),
-      "--to-earliest")
-    val consumerGroupCommand = getConsumerGroupService(cgcArgs)
+    val topic1 = "topic1"
+    val topic2 = "topic2"
 
     adminZkClient.createTopic(topic1, 2, 1)
     adminZkClient.createTopic(topic2, 2, 1)
 
-    produceConsumeAndShutdown(consumerGroupCommand, 2, topic1, 100)
-    produceConsumeAndShutdown(consumerGroupCommand, 2, topic2, 100)
+    val args = Array("--bootstrap-server", brokerList, "--reset-offsets", 
"--group", group, "--topic",
+      s"$topic1:1", "--topic", s"$topic2:1", "--to-earliest", "--execute")
+    val consumerGroupCommand = getConsumerGroupService(args)
 
-    TestUtils.waitUntilTrue(() => {
-      val assignmentsToReset = consumerGroupCommand.resetOffsets()
-      assignmentsToReset.exists { assignment => assignment._2.offset() == 0 && 
assignment._1.partition() == 1 && assignment._1.topic() == topic1 }
-      assignmentsToReset.exists { assignment => assignment._2.offset() == 0 && 
assignment._1.partition() == 1 && assignment._1.topic() == topic2 }
-    }, "Expected the consumer group to reset to offset 0 (earliest) in 
partition 1.")
+    produceConsumeAndShutdown(100, 2, topic1)
+    produceConsumeAndShutdown(100, 2, topic2)
+
+    val priorCommittedOffsets1 = committedOffsets(topic1)
+    val priorCommittedOffsets2 = committedOffsets(topic2)
+
+    val tp1 = new TopicPartition(topic1, 1)
+    val tp2 = new TopicPartition(topic2, 1)
+    val allResetOffsets = resetOffsets(consumerGroupCommand)
+    assertEquals(Map(tp1 -> 0, tp2 -> 0), allResetOffsets)
+
+    assertEquals(priorCommittedOffsets1 + (tp1 -> 0L), 
committedOffsets(topic1))
+    assertEquals(priorCommittedOffsets2 + (tp2 -> 0L), 
committedOffsets(topic2))
 
-    printConsumerGroup()
     adminZkClient.deleteTopic(topic1)
     adminZkClient.deleteTopic(topic2)
   }
 
   @Test
   def testResetOffsetsExportImportPlan() {
-    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", 
"--group", group, "--all-topics", "--to-offset","2", "--export")
-    val consumerGroupCommand = getConsumerGroupService(cgcArgs)
+    val topic = "bar"
+    val tp0 = new TopicPartition(topic, 0)
+    val tp1 = new TopicPartition(topic, 1)
+    adminZkClient.createTopic(topic, 2, 1)
 
-    adminZkClient.createTopic(topic1, 2, 1)
+    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", 
"--group", group, "--all-topics",
+      "--to-offset", "2", "--export")
+    val consumerGroupCommand = getConsumerGroupService(cgcArgs)
 
-    produceConsumeAndShutdown(consumerGroupCommand, 2, topic1, 100)
+    produceConsumeAndShutdown(100, 2, topic)
 
     val file = File.createTempFile("reset", ".csv")
+    file.deleteOnExit()
 
-    TestUtils.waitUntilTrue(() => {
-      val assignmentsToReset = consumerGroupCommand.resetOffsets()
-      val bw = new BufferedWriter(new FileWriter(file))
-      bw.write(consumerGroupCommand.exportOffsetsToReset(assignmentsToReset))
-      bw.close()
-      assignmentsToReset.exists { assignment => assignment._2.offset() == 2 } 
&& file.exists()
-    }, "Expected the consume all messages and save reset offsets plan to file")
-
+    val exportedOffsets = consumerGroupCommand.resetOffsets()
+    val bw = new BufferedWriter(new FileWriter(file))
+    bw.write(consumerGroupCommand.exportOffsetsToReset(exportedOffsets))
+    bw.close()
+    assertEquals(Map(tp0 -> 2L, tp1 -> 2L), 
exportedOffsets.mapValues(_.offset))
 
-    val cgcArgsExec = Array("--bootstrap-server", brokerList, 
"--reset-offsets", "--group", group, "--all-topics", "--from-file", 
file.getCanonicalPath, "--dry-run")
+    val cgcArgsExec = Array("--bootstrap-server", brokerList, 
"--reset-offsets", "--group", group, "--all-topics",
+      "--from-file", file.getCanonicalPath, "--dry-run")
     val consumerGroupCommandExec = getConsumerGroupService(cgcArgsExec)
+    val importedOffsets = consumerGroupCommandExec.resetOffsets()
+    assertEquals(Map(tp0 -> 2L, tp1 -> 2L), 
importedOffsets.mapValues(_.offset))
 
-    TestUtils.waitUntilTrue(() => {
-        val assignmentsToReset = consumerGroupCommandExec.resetOffsets()
-        assignmentsToReset.exists { assignment => assignment._2.offset() == 2 }
-    }, "Expected the consumer group to reset to offset 2 according to the plan 
in the file.")
+    adminZkClient.deleteTopic(topic)
+  }
 
-    file.deleteOnExit()
+  private def produceConsumeAndShutdown(totalMessages: Int, numConsumers: Int 
= 1, topic: String = topic) {
+    TestUtils.produceMessages(servers, topic, totalMessages, acks = 1, 100 * 
1000)
+    val executor =  addConsumerGroupExecutor(numConsumers, topic)
+    awaitConsumerProgress(topic, totalMessages)
+    executor.shutdown()
+  }
 
-    printConsumerGroup()
-    adminZkClient.deleteTopic(topic1)
+  private def awaitConsumerProgress(topic: String = topic, count: Long): Unit 
= {
+    TestUtils.waitUntilTrue(() => {
+      val offsets = committedOffsets(topic).values
+      count == offsets.sum
+    }, "Expected that consumer group has consumed all messages from 
topic/partition.")
   }
 
-  private def printConsumerGroup() {
-    val cgcArgs = Array("--bootstrap-server", brokerList, "--group", group, 
"--describe")
-    ConsumerGroupCommand.main(cgcArgs)
+  private def resetAndAssertOffsets(args: Array[String],
+                                           expectedOffset: Long,
+                                           group: String = group,
+                                           dryRun: Boolean = false): Unit = {
+    val consumerGroupCommand = getConsumerGroupService(args)
+    try {
+      val priorOffsets = committedOffsets(group = group)
+      val expectedOffsets = Map(new TopicPartition(topic, 0) -> expectedOffset)
+      assertEquals(expectedOffsets, resetOffsets(consumerGroupCommand))
+      assertEquals(if (dryRun) priorOffsets else expectedOffsets, 
committedOffsets(group = group))
+    } finally {
+      consumerGroupCommand.close()
+    }
+  }
+
+  private def resetAndAssertOffsetsCommitted(consumerGroupService: 
ConsumerGroupService,
+                                             expectedOffsets: 
Map[TopicPartition, Long],
+                                             topic: String = topic): Unit = {
+    val allResetOffsets = resetOffsets(consumerGroupService)
+    allResetOffsets.foreach { case (tp, offset) =>
+      assertEquals(offset, expectedOffsets(tp))
+    }
+    assertEquals(expectedOffsets, committedOffsets(topic))
   }
 
-  private def printConsumerGroup(group: String) {
-    val cgcArgs = Array("--bootstrap-server", brokerList, "--group", group, 
"--describe")
-    ConsumerGroupCommand.main(cgcArgs)
+  private def resetOffsets(consumerGroupService: ConsumerGroupService): 
Map[TopicPartition, Long] = {
+    consumerGroupService.resetOffsets().mapValues(_.offset)
   }
 
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
index 758c4f6..8a82bf9 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
@@ -219,7 +219,9 @@ public abstract class AbstractResetIntegrationTest {
         final String[] parameters = new String[] {
             "--application-id", appID,
             "--bootstrap-servers", cluster.bootstrapServers(),
-            "--input-topics", NON_EXISTING_TOPIC };
+            "--input-topics", NON_EXISTING_TOPIC,
+            "--execute"
+        };
         final Properties cleanUpConfig = new Properties();
         cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
         cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + 
CLEANUP_CONSUMER_TIMEOUT);
@@ -241,7 +243,9 @@ public abstract class AbstractResetIntegrationTest {
         final String[] parameters = new String[] {
             "--application-id", appID,
             "--bootstrap-servers", cluster.bootstrapServers(),
-            "--input-topics", NON_EXISTING_TOPIC };
+            "--input-topics", NON_EXISTING_TOPIC,
+            "--execute"
+        };
         final Properties cleanUpConfig = new Properties();
         cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
         cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + 
CLEANUP_CONSUMER_TIMEOUT);
@@ -255,7 +259,9 @@ public abstract class AbstractResetIntegrationTest {
         final String[] parameters = new String[] {
             "--application-id", appID,
             "--bootstrap-servers", cluster.bootstrapServers(),
-            "--input-topics", NON_EXISTING_TOPIC };
+            "--intermediate-topics", NON_EXISTING_TOPIC,
+            "--execute"
+        };
         final Properties cleanUpConfig = new Properties();
         cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
         cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + 
CLEANUP_CONSUMER_TIMEOUT);
@@ -548,8 +554,9 @@ public abstract class AbstractResetIntegrationTest {
         // leaving --zookeeper arg here to ensure tool works if users add it
         final List<String> parameterList = new ArrayList<>(
             Arrays.asList("--application-id", appID,
-                "--bootstrap-servers", cluster.bootstrapServers(),
-                "--input-topics", INPUT_TOPIC));
+                    "--bootstrap-servers", cluster.bootstrapServers(),
+                    "--input-topics", INPUT_TOPIC,
+                    "--execute"));
         if (withIntermediateTopics) {
             parameterList.add("--intermediate-topics");
             parameterList.add(INTERMEDIATE_USER_TOPIC);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java 
b/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
index dd32ad0..ad19f32 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
@@ -43,9 +43,6 @@ import java.util.concurrent.ExecutionException;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
-/**
- *
- */
 public class StreamsResetterTest {
 
     private static final String TOPIC = "topic1";

-- 
To stop receiving notification emails like this one, please contact
j...@apache.org.

Reply via email to