fjy closed pull request #6168: Fix NPE for taskGroupId when rolling update
URL: https://github.com/apache/incubator-druid/pull/6168
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/extensions-contrib/materialized-view-maintenance/src/main/java/io/druid/indexing/materializedview/MaterializedViewSupervisor.java
 
b/extensions-contrib/materialized-view-maintenance/src/main/java/io/druid/indexing/materializedview/MaterializedViewSupervisor.java
index fedda092c4d..d499b4f0e8d 100644
--- 
a/extensions-contrib/materialized-view-maintenance/src/main/java/io/druid/indexing/materializedview/MaterializedViewSupervisor.java
+++ 
b/extensions-contrib/materialized-view-maintenance/src/main/java/io/druid/indexing/materializedview/MaterializedViewSupervisor.java
@@ -53,6 +53,7 @@
 import org.joda.time.Duration;
 import org.joda.time.Interval;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
@@ -239,7 +240,12 @@ public void reset(DataSourceMetadata dataSourceMetadata)
   }
 
   @Override
-  public void checkpoint(int taskGroupId, DataSourceMetadata 
previousCheckPoint, DataSourceMetadata currentCheckPoint)
+  public void checkpoint(
+      @Nullable Integer taskGroupId,
+      String baseSequenceName,
+      DataSourceMetadata previousCheckPoint,
+      DataSourceMetadata currentCheckPoint
+  )
   {
     // do nothing
   }
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
 
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
index a93fde611c5..3a385072c30 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
@@ -603,6 +603,7 @@ public void onFailure(Throwable t)
             final CheckPointDataSourceMetadataAction checkpointAction = new 
CheckPointDataSourceMetadataAction(
                 task.getDataSource(),
                 ioConfig.getTaskGroupId(),
+                task.getIOConfig().getBaseSequenceName(),
                 new KafkaDataSourceMetadata(new KafkaPartitions(topic, 
sequenceToCheckpoint.getStartOffsets())),
                 new KafkaDataSourceMetadata(new KafkaPartitions(topic, 
nextOffsets))
             );
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
 
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index e511b00dcd7..8c9bb599ada 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -92,6 +92,7 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
@@ -143,7 +144,7 @@
    * time, there should only be up to a maximum of [taskCount] 
actively-reading task groups (tracked in the [taskGroups]
    * map) + zero or more pending-completion task groups (tracked in 
[pendingCompletionTaskGroups]).
    */
-  private static class TaskGroup
+  private class TaskGroup
   {
     // This specifies the partitions and starting offsets for this task group. 
It is set on group creation from the data
     // in [partitionGroups] and never changes during the lifetime of this task 
group, which will live until a task in
@@ -157,6 +158,7 @@
     final Optional<DateTime> maximumMessageTime;
     DateTime completionTimeout; // is set after signalTasksToFinish(); if not 
done by timeout, take corrective action
     final TreeMap<Integer, Map<Integer, Long>> sequenceOffsets = new 
TreeMap<>();
+    final String baseSequenceName;
 
     TaskGroup(
         ImmutableMap<Integer, Long> partitionOffsets,
@@ -168,6 +170,7 @@
       this.minimumMessageTime = minimumMessageTime;
       this.maximumMessageTime = maximumMessageTime;
       this.sequenceOffsets.put(0, partitionOffsets);
+      this.baseSequenceName = generateSequenceName(partitionOffsets, 
minimumMessageTime, maximumMessageTime);
     }
 
     int addNewCheckpoint(Map<Integer, Long> checkpoint)
@@ -509,23 +512,29 @@ public void reset(DataSourceMetadata dataSourceMetadata)
   }
 
   @Override
-  public void checkpoint(int taskGroupId, DataSourceMetadata 
previousCheckpoint, DataSourceMetadata currentCheckpoint)
+  public void checkpoint(
+      @Nullable Integer taskGroupId,
+      @Deprecated String baseSequenceName,
+      DataSourceMetadata previousCheckPoint,
+      DataSourceMetadata currentCheckPoint
+  )
   {
-    Preconditions.checkNotNull(previousCheckpoint, "previousCheckpoint");
-    Preconditions.checkNotNull(currentCheckpoint, "current checkpoint cannot 
be null");
+    Preconditions.checkNotNull(previousCheckPoint, "previousCheckpoint");
+    Preconditions.checkNotNull(currentCheckPoint, "current checkpoint cannot 
be null");
     Preconditions.checkArgument(
-        ioConfig.getTopic().equals(((KafkaDataSourceMetadata) 
currentCheckpoint).getKafkaPartitions().getTopic()),
+        ioConfig.getTopic().equals(((KafkaDataSourceMetadata) 
currentCheckPoint).getKafkaPartitions().getTopic()),
         "Supervisor topic [%s] and topic in checkpoint [%s] does not match",
         ioConfig.getTopic(),
-        ((KafkaDataSourceMetadata) 
currentCheckpoint).getKafkaPartitions().getTopic()
+        ((KafkaDataSourceMetadata) 
currentCheckPoint).getKafkaPartitions().getTopic()
     );
 
-    log.info("Checkpointing [%s] for taskGroup [%s]", currentCheckpoint, 
taskGroupId);
+    log.info("Checkpointing [%s] for taskGroup [%s]", currentCheckPoint, 
taskGroupId);
     notices.add(
         new CheckpointNotice(
             taskGroupId,
-            (KafkaDataSourceMetadata) previousCheckpoint,
-            (KafkaDataSourceMetadata) currentCheckpoint
+            baseSequenceName,
+            (KafkaDataSourceMetadata) previousCheckPoint,
+            (KafkaDataSourceMetadata) currentCheckPoint
         )
     );
   }
@@ -629,17 +638,20 @@ public void handle()
 
   private class CheckpointNotice implements Notice
   {
-    final int taskGroupId;
-    final KafkaDataSourceMetadata previousCheckpoint;
-    final KafkaDataSourceMetadata currentCheckpoint;
+    @Nullable private final Integer nullableTaskGroupId;
+    @Deprecated private final String baseSequenceName;
+    private final KafkaDataSourceMetadata previousCheckpoint;
+    private final KafkaDataSourceMetadata currentCheckpoint;
 
     CheckpointNotice(
-        int taskGroupId,
+        @Nullable Integer nullableTaskGroupId,
+        @Deprecated String baseSequenceName,
         KafkaDataSourceMetadata previousCheckpoint,
         KafkaDataSourceMetadata currentCheckpoint
     )
     {
-      this.taskGroupId = taskGroupId;
+      this.baseSequenceName = baseSequenceName;
+      this.nullableTaskGroupId = nullableTaskGroupId;
       this.previousCheckpoint = previousCheckpoint;
       this.currentCheckpoint = currentCheckpoint;
     }
@@ -647,12 +659,44 @@ public void handle()
     @Override
     public void handle() throws ExecutionException, InterruptedException
     {
+      // Find taskGroupId using taskId if it's null. It can be null while 
rolling update.
+      final int taskGroupId;
+      if (nullableTaskGroupId == null) {
+        // We search taskId in taskGroups and pendingCompletionTaskGroups 
sequentially. This should be fine because
+        // 1) a taskGroup can be moved from taskGroups to 
pendingCompletionTaskGroups in RunNotice
+        //    (see checkTaskDuration()).
+        // 2) Notices are proceesed by a single thread. So, CheckpointNotice 
and RunNotice cannot be processed at the
+        //    same time.
+        final java.util.Optional<Integer> maybeGroupId = taskGroups
+            .entrySet()
+            .stream()
+            .filter(entry -> {
+              final TaskGroup taskGroup = entry.getValue();
+              return taskGroup.baseSequenceName.equals(baseSequenceName);
+            })
+            .findAny()
+            .map(Entry::getKey);
+        taskGroupId = maybeGroupId.orElse(
+            pendingCompletionTaskGroups
+                .entrySet()
+                .stream()
+                .filter(entry -> {
+                  final List<TaskGroup> taskGroups = entry.getValue();
+                  return taskGroups.stream().anyMatch(group -> 
group.baseSequenceName.equals(baseSequenceName));
+                })
+                .findAny()
+                .orElseThrow(() -> new ISE("Cannot find taskGroup for 
baseSequenceName[%s]", baseSequenceName))
+                .getKey()
+        );
+      } else {
+        taskGroupId = nullableTaskGroupId;
+      }
+
       // check for consistency
       // if already received request for this sequenceName and 
dataSourceMetadata combination then return
-
       final TaskGroup taskGroup = taskGroups.get(taskGroupId);
 
-      if (isValidTaskGroup(taskGroup)) {
+      if (isValidTaskGroup(taskGroupId, taskGroup)) {
         final TreeMap<Integer, Map<Integer, Long>> checkpoints = 
taskGroup.sequenceOffsets;
 
         // check validity of previousCheckpoint
@@ -674,20 +718,13 @@ public void handle() throws ExecutionException, 
InterruptedException
           log.info("Already checkpointed with offsets [%s]", 
checkpoints.lastEntry().getValue());
           return;
         }
-        final int taskGroupId = getTaskGroupIdForPartition(
-            currentCheckpoint.getKafkaPartitions()
-                             .getPartitionOffsetMap()
-                             .keySet()
-                             .iterator()
-                             .next()
-        );
         final Map<Integer, Long> newCheckpoint = 
checkpointTaskGroup(taskGroupId, false).get();
         taskGroups.get(taskGroupId).addNewCheckpoint(newCheckpoint);
         log.info("Handled checkpoint notice, new checkpoint is [%s] for 
taskGroup [%s]", newCheckpoint, taskGroupId);
       }
     }
 
-    private boolean isValidTaskGroup(@Nullable TaskGroup taskGroup)
+    private boolean isValidTaskGroup(int taskGroupId, @Nullable TaskGroup 
taskGroup)
     {
       if (taskGroup == null) {
         // taskGroup might be in pendingCompletionTaskGroups or partitionGroups
@@ -886,17 +923,6 @@ String generateSequenceName(
     return Joiner.on("_").join("index_kafka", dataSource, hashCode);
   }
 
-  @VisibleForTesting
-  String generateSequenceName(TaskGroup taskGroup)
-  {
-    Preconditions.checkNotNull(taskGroup, "taskGroup cannot be null");
-    return generateSequenceName(
-        taskGroup.partitionOffsets,
-        taskGroup.minimumMessageTime,
-        taskGroup.maximumMessageTime
-    );
-  }
-
   private static String getRandomId()
   {
     final StringBuilder suffix = new StringBuilder(8);
@@ -1774,7 +1800,6 @@ private void createKafkaTasksForGroup(int groupId, int 
replicas) throws JsonProc
       endPartitions.put(partition, Long.MAX_VALUE);
     }
     TaskGroup group = taskGroups.get(groupId);
-    String sequenceName = generateSequenceName(group);
 
     Map<String, String> consumerProperties = 
Maps.newHashMap(ioConfig.getConsumerProperties());
     DateTime minimumMessageTime = 
taskGroups.get(groupId).minimumMessageTime.orNull();
@@ -1782,7 +1807,7 @@ private void createKafkaTasksForGroup(int groupId, int 
replicas) throws JsonProc
 
     KafkaIOConfig kafkaIOConfig = new KafkaIOConfig(
         groupId,
-        sequenceName,
+        group.baseSequenceName,
         new KafkaPartitions(ioConfig.getTopic(), startPartitions),
         new KafkaPartitions(ioConfig.getTopic(), endPartitions),
         consumerProperties,
@@ -1803,10 +1828,10 @@ private void createKafkaTasksForGroup(int groupId, int 
replicas) throws JsonProc
                                             .putAll(spec.getContext())
                                             .build();
     for (int i = 0; i < replicas; i++) {
-      String taskId = Joiner.on("_").join(sequenceName, getRandomId());
+      String taskId = Joiner.on("_").join(group.baseSequenceName, 
getRandomId());
       KafkaIndexTask indexTask = new KafkaIndexTask(
           taskId,
-          new TaskResource(sequenceName, 1),
+          new TaskResource(group.baseSequenceName, 1),
           spec.getDataSchema(),
           taskTuningConfig,
           kafkaIOConfig,
@@ -1936,7 +1961,10 @@ private boolean isTaskCurrent(int taskGroupId, String 
taskId)
 
     String taskSequenceName = ((KafkaIndexTask) 
taskOptional.get()).getIOConfig().getBaseSequenceName();
     if (taskGroups.get(taskGroupId) != null) {
-      return 
generateSequenceName(taskGroups.get(taskGroupId)).equals(taskSequenceName);
+      return Preconditions
+          .checkNotNull(taskGroups.get(taskGroupId), "null taskGroup for 
taskId[%s]", taskGroupId)
+          .baseSequenceName
+          .equals(taskSequenceName);
     } else {
       return generateSequenceName(
           ((KafkaIndexTask) taskOptional.get()).getIOConfig()
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
index 1929bb45132..c3cf1153671 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -2060,7 +2060,8 @@ private void makeToolboxFactory() throws IOException
           @Override
           public boolean checkPointDataSourceMetadata(
               String supervisorId,
-              int taskGroupId,
+              @Nullable Integer taskGroupId,
+              String baseSequenceName,
               @Nullable DataSourceMetadata previousDataSourceMetadata,
               @Nullable DataSourceMetadata currentDataSourceMetadata
           )
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 36f3d733d6f..83e253ff9f1 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -2104,6 +2104,7 @@ public void testCheckpointForInactiveTaskGroup()
     supervisor.moveTaskGroupToPendingCompletion(0);
     supervisor.checkpoint(
         0,
+        ((KafkaIndexTask) id1).getIOConfig().getBaseSequenceName(),
         new KafkaDataSourceMetadata(new KafkaPartitions(topic, 
checkpoints.get(0))),
         new KafkaDataSourceMetadata(new KafkaPartitions(topic, 
fakeCheckpoints))
     );
@@ -2173,6 +2174,7 @@ public void testCheckpointForUnknownTaskGroup() throws 
InterruptedException
 
     supervisor.checkpoint(
         0,
+        ((KafkaIndexTask) id1).getIOConfig().getBaseSequenceName(),
         new KafkaDataSourceMetadata(new KafkaPartitions(topic, 
Collections.emptyMap())),
         new KafkaDataSourceMetadata(new KafkaPartitions(topic, 
Collections.emptyMap()))
     );
@@ -2195,13 +2197,100 @@ public void testCheckpointForUnknownTaskGroup() throws 
InterruptedException
     Assert.assertEquals(ISE.class, serviceEmitter.getExceptionClass());
   }
 
+  @Test(timeout = 60_000L)
+  public void testCheckpointWithNullTaskGroupId()
+      throws InterruptedException, ExecutionException, TimeoutException, 
JsonProcessingException
+  {
+    supervisor = getSupervisor(1, 3, true, "PT1S", null, null, false);
+    //not adding any events
+    final Task id1 = createKafkaIndexTask(
+        "id1",
+        DATASOURCE,
+        0,
+        new KafkaPartitions(topic, ImmutableMap.of(0, 0L)),
+        new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE)),
+        null,
+        null
+    );
+
+    final Task id2 = createKafkaIndexTask(
+        "id2",
+        DATASOURCE,
+        0,
+        new KafkaPartitions(topic, ImmutableMap.of(0, 0L)),
+        new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE)),
+        null,
+        null
+    );
+
+    final Task id3 = createKafkaIndexTask(
+        "id3",
+        DATASOURCE,
+        0,
+        new KafkaPartitions(topic, ImmutableMap.of(0, 0L)),
+        new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE)),
+        null,
+        null
+    );
+
+    
expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+    
expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+    expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(id1, id2, 
id3)).anyTimes();
+    
expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
+    
expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
+    
expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes();
+    expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
+    expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
+    expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes();
+    expect(
+        
indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(new
 KafkaDataSourceMetadata(null)
+    ).anyTimes();
+    taskRunner.registerListener(anyObject(TaskRunnerListener.class), 
anyObject(Executor.class));
+    expect(taskClient.getStatusAsync(anyString()))
+        .andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING))
+        .anyTimes();
+    final TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
+    checkpoints.put(0, ImmutableMap.of(0, 0L));
+    expect(taskClient.getCheckpointsAsync(anyString(), anyBoolean()))
+        .andReturn(Futures.immediateFuture(checkpoints))
+        .times(3);
+    
expect(taskClient.getStartTimeAsync(anyString())).andReturn(Futures.immediateFuture(DateTimes.nowUtc())).anyTimes();
+    expect(taskClient.pauseAsync(anyString()))
+        .andReturn(Futures.immediateFuture(ImmutableMap.of(0, 10L)))
+        .anyTimes();
+    expect(taskClient.setEndOffsetsAsync(anyString(), 
EasyMock.eq(ImmutableMap.of(0, 10L)), anyBoolean()))
+        .andReturn(Futures.immediateFuture(true))
+        .anyTimes();
+
+    replayAll();
+
+    supervisor.start();
+
+    supervisor.runInternal();
+
+    final TreeMap<Integer, Map<Integer, Long>> newCheckpoints = new 
TreeMap<>();
+    newCheckpoints.put(0, ImmutableMap.of(0, 10L));
+    supervisor.checkpoint(
+        null,
+        ((KafkaIndexTask) id1).getIOConfig().getBaseSequenceName(),
+        new KafkaDataSourceMetadata(new KafkaPartitions(topic, 
checkpoints.get(0))),
+        new KafkaDataSourceMetadata(new KafkaPartitions(topic, 
newCheckpoints.get(0)))
+    );
+
+    while (supervisor.getNoticesQueueSize() > 0) {
+      Thread.sleep(100);
+    }
+
+    verifyAll();
+  }
+
   private void addSomeEvents(int numEventsPerPartition) throws Exception
   {
     try (final KafkaProducer<byte[], byte[]> kafkaProducer = 
kafkaServer.newProducer()) {
       for (int i = 0; i < NUM_PARTITIONS; i++) {
         for (int j = 0; j < numEventsPerPartition; j++) {
           kafkaProducer.send(
-              new ProducerRecord<byte[], byte[]>(
+              new ProducerRecord<>(
                   topic,
                   i,
                   null,
diff --git 
a/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java
 
b/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java
index f1d11deb4ea..433af987be3 100644
--- 
a/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java
+++ 
b/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java
@@ -25,22 +25,29 @@
 import io.druid.indexing.common.task.Task;
 import io.druid.indexing.overlord.DataSourceMetadata;
 
+import javax.annotation.Nullable;
+
 public class CheckPointDataSourceMetadataAction implements TaskAction<Boolean>
 {
   private final String supervisorId;
-  private final int taskGroupId;
+  @Nullable
+  private final Integer taskGroupId;
+  @Deprecated
+  private final String baseSequenceName;
   private final DataSourceMetadata previousCheckPoint;
   private final DataSourceMetadata currentCheckPoint;
 
   public CheckPointDataSourceMetadataAction(
       @JsonProperty("supervisorId") String supervisorId,
-      @JsonProperty("taskGroupId") Integer taskGroupId,
+      @JsonProperty("taskGroupId") @Nullable Integer taskGroupId, // nullable 
for backward compatibility,
+      @JsonProperty("sequenceName") @Deprecated String baseSequenceName, // 
old version would use this
       @JsonProperty("previousCheckPoint") DataSourceMetadata 
previousCheckPoint,
       @JsonProperty("currentCheckPoint") DataSourceMetadata currentCheckPoint
   )
   {
     this.supervisorId = Preconditions.checkNotNull(supervisorId, 
"supervisorId");
-    this.taskGroupId = Preconditions.checkNotNull(taskGroupId, "taskGroupId");
+    this.taskGroupId = taskGroupId;
+    this.baseSequenceName = Preconditions.checkNotNull(baseSequenceName, 
"sequenceName");
     this.previousCheckPoint = Preconditions.checkNotNull(previousCheckPoint, 
"previousCheckPoint");
     this.currentCheckPoint = Preconditions.checkNotNull(currentCheckPoint, 
"currentCheckPoint");
   }
@@ -51,8 +58,16 @@ public String getSupervisorId()
     return supervisorId;
   }
 
+  @Deprecated
+  @JsonProperty("sequenceName")
+  public String getBaseSequenceName()
+  {
+    return baseSequenceName;
+  }
+
+  @Nullable
   @JsonProperty
-  public int getTaskGroupId()
+  public Integer getTaskGroupId()
   {
     return taskGroupId;
   }
@@ -85,6 +100,7 @@ public Boolean perform(
     return toolbox.getSupervisorManager().checkPointDataSourceMetadata(
         supervisorId,
         taskGroupId,
+        baseSequenceName,
         previousCheckPoint,
         currentCheckPoint
     );
@@ -101,6 +117,7 @@ public String toString()
   {
     return "CheckPointDataSourceMetadataAction{" +
            "supervisorId='" + supervisorId + '\'' +
+           ", baseSequenceName='" + baseSequenceName + '\'' +
            ", taskGroupId='" + taskGroupId + '\'' +
            ", previousCheckPoint=" + previousCheckPoint +
            ", currentCheckPoint=" + currentCheckPoint +
diff --git 
a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java
 
b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java
index 355465cec5d..03cc96f5b0b 100644
--- 
a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java
+++ 
b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java
@@ -165,7 +165,8 @@ public boolean resetSupervisor(String id, @Nullable 
DataSourceMetadata dataSourc
 
   public boolean checkPointDataSourceMetadata(
       String supervisorId,
-      int taskGroupId,
+      @Nullable Integer taskGroupId,
+      String baseSequenceName,
       DataSourceMetadata previousDataSourceMetadata,
       DataSourceMetadata currentDataSourceMetadata
   )
@@ -178,7 +179,7 @@ public boolean checkPointDataSourceMetadata(
 
       Preconditions.checkNotNull(supervisor, "supervisor could not be found");
 
-      supervisor.lhs.checkpoint(taskGroupId, previousDataSourceMetadata, 
currentDataSourceMetadata);
+      supervisor.lhs.checkpoint(taskGroupId, baseSequenceName, 
previousDataSourceMetadata, currentDataSourceMetadata);
       return true;
     }
     catch (Exception e) {
diff --git 
a/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
 
b/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
index 0408104cde8..ccf695e41a5 100644
--- 
a/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
+++ 
b/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
@@ -83,7 +83,8 @@ public void reset(DataSourceMetadata dataSourceMetadata) {}
 
       @Override
       public void checkpoint(
-          int taskGroupId,
+          @Nullable Integer taskGroupId,
+          String baseSequenceName,
           DataSourceMetadata previousCheckPoint,
           DataSourceMetadata currentCheckPoint
       )
diff --git 
a/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java 
b/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java
index 04afac7aea6..6f3c05003a8 100644
--- a/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java
+++ b/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java
@@ -22,6 +22,7 @@
 import com.google.common.collect.ImmutableMap;
 import io.druid.indexing.overlord.DataSourceMetadata;
 
+import javax.annotation.Nullable;
 import java.util.Map;
 
 public interface Supervisor
@@ -52,8 +53,14 @@
    * represented by {@param currentCheckpoint} DataSourceMetadata
    *
    * @param taskGroupId        unique Identifier to figure out for which 
sequence to do checkpointing
+   * @param baseSequenceName   baseSequenceName
    * @param previousCheckPoint DataSourceMetadata checkpointed in previous call
    * @param currentCheckPoint  current DataSourceMetadata to be checkpointed
    */
-  void checkpoint(int taskGroupId, DataSourceMetadata previousCheckPoint, 
DataSourceMetadata currentCheckPoint);
+  void checkpoint(
+      @Nullable Integer taskGroupId,
+      @Deprecated String baseSequenceName,
+      DataSourceMetadata previousCheckPoint,
+      DataSourceMetadata currentCheckPoint
+  );
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to