fjy closed pull request #6168: Fix NPE for taskGroupId when rolling update URL: https://github.com/apache/incubator-druid/pull/6168
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/io/druid/indexing/materializedview/MaterializedViewSupervisor.java b/extensions-contrib/materialized-view-maintenance/src/main/java/io/druid/indexing/materializedview/MaterializedViewSupervisor.java index fedda092c4d..d499b4f0e8d 100644 --- a/extensions-contrib/materialized-view-maintenance/src/main/java/io/druid/indexing/materializedview/MaterializedViewSupervisor.java +++ b/extensions-contrib/materialized-view-maintenance/src/main/java/io/druid/indexing/materializedview/MaterializedViewSupervisor.java @@ -53,6 +53,7 @@ import org.joda.time.Duration; import org.joda.time.Interval; +import javax.annotation.Nullable; import java.io.IOException; import java.util.List; import java.util.Map; @@ -239,7 +240,12 @@ public void reset(DataSourceMetadata dataSourceMetadata) } @Override - public void checkpoint(int taskGroupId, DataSourceMetadata previousCheckPoint, DataSourceMetadata currentCheckPoint) + public void checkpoint( + @Nullable Integer taskGroupId, + String baseSequenceName, + DataSourceMetadata previousCheckPoint, + DataSourceMetadata currentCheckPoint + ) { // do nothing } diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java index a93fde611c5..3a385072c30 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java @@ -603,6 +603,7 @@ public void onFailure(Throwable t) final CheckPointDataSourceMetadataAction checkpointAction = new CheckPointDataSourceMetadataAction( task.getDataSource(), ioConfig.getTaskGroupId(), + task.getIOConfig().getBaseSequenceName(), new KafkaDataSourceMetadata(new KafkaPartitions(topic, sequenceToCheckpoint.getStartOffsets())), new KafkaDataSourceMetadata(new KafkaPartitions(topic, nextOffsets)) ); diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java index e511b00dcd7..8c9bb599ada 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -92,6 +92,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Properties; import java.util.Random; import java.util.Set; @@ -143,7 +144,7 @@ * time, there should only be up to a maximum of [taskCount] actively-reading task groups (tracked in the [taskGroups] * map) + zero or more pending-completion task groups (tracked in [pendingCompletionTaskGroups]). */ - private static class TaskGroup + private class TaskGroup { // This specifies the partitions and starting offsets for this task group. It is set on group creation from the data // in [partitionGroups] and never changes during the lifetime of this task group, which will live until a task in @@ -157,6 +158,7 @@ final Optional<DateTime> maximumMessageTime; DateTime completionTimeout; // is set after signalTasksToFinish(); if not done by timeout, take corrective action final TreeMap<Integer, Map<Integer, Long>> sequenceOffsets = new TreeMap<>(); + final String baseSequenceName; TaskGroup( ImmutableMap<Integer, Long> partitionOffsets, @@ -168,6 +170,7 @@ this.minimumMessageTime = minimumMessageTime; this.maximumMessageTime = maximumMessageTime; this.sequenceOffsets.put(0, partitionOffsets); + this.baseSequenceName = generateSequenceName(partitionOffsets, minimumMessageTime, maximumMessageTime); } int addNewCheckpoint(Map<Integer, Long> checkpoint) @@ -509,23 +512,29 @@ public void reset(DataSourceMetadata dataSourceMetadata) } @Override - public void checkpoint(int taskGroupId, DataSourceMetadata previousCheckpoint, DataSourceMetadata currentCheckpoint) + public void checkpoint( + @Nullable Integer taskGroupId, + @Deprecated String baseSequenceName, + DataSourceMetadata previousCheckPoint, + DataSourceMetadata currentCheckPoint + ) { - Preconditions.checkNotNull(previousCheckpoint, "previousCheckpoint"); - Preconditions.checkNotNull(currentCheckpoint, "current checkpoint cannot be null"); + Preconditions.checkNotNull(previousCheckPoint, "previousCheckpoint"); + Preconditions.checkNotNull(currentCheckPoint, "current checkpoint cannot be null"); Preconditions.checkArgument( - ioConfig.getTopic().equals(((KafkaDataSourceMetadata) currentCheckpoint).getKafkaPartitions().getTopic()), + ioConfig.getTopic().equals(((KafkaDataSourceMetadata) currentCheckPoint).getKafkaPartitions().getTopic()), "Supervisor topic [%s] and topic in checkpoint [%s] does not match", ioConfig.getTopic(), - ((KafkaDataSourceMetadata) currentCheckpoint).getKafkaPartitions().getTopic() + ((KafkaDataSourceMetadata) currentCheckPoint).getKafkaPartitions().getTopic() ); - log.info("Checkpointing [%s] for taskGroup [%s]", currentCheckpoint, taskGroupId); + log.info("Checkpointing [%s] for taskGroup [%s]", currentCheckPoint, taskGroupId); notices.add( new CheckpointNotice( taskGroupId, - (KafkaDataSourceMetadata) previousCheckpoint, - (KafkaDataSourceMetadata) currentCheckpoint + baseSequenceName, + (KafkaDataSourceMetadata) previousCheckPoint, + (KafkaDataSourceMetadata) currentCheckPoint ) ); } @@ -629,17 +638,20 @@ public void handle() private class CheckpointNotice implements Notice { - final int taskGroupId; - final KafkaDataSourceMetadata previousCheckpoint; - final KafkaDataSourceMetadata currentCheckpoint; + @Nullable private final Integer nullableTaskGroupId; + @Deprecated private final String baseSequenceName; + private final KafkaDataSourceMetadata previousCheckpoint; + private final KafkaDataSourceMetadata currentCheckpoint; CheckpointNotice( - int taskGroupId, + @Nullable Integer nullableTaskGroupId, + @Deprecated String baseSequenceName, KafkaDataSourceMetadata previousCheckpoint, KafkaDataSourceMetadata currentCheckpoint ) { - this.taskGroupId = taskGroupId; + this.baseSequenceName = baseSequenceName; + this.nullableTaskGroupId = nullableTaskGroupId; this.previousCheckpoint = previousCheckpoint; this.currentCheckpoint = currentCheckpoint; } @@ -647,12 +659,44 @@ public void handle() @Override public void handle() throws ExecutionException, InterruptedException { + // Find taskGroupId using taskId if it's null. It can be null while rolling update. + final int taskGroupId; + if (nullableTaskGroupId == null) { + // We search taskId in taskGroups and pendingCompletionTaskGroups sequentially. This should be fine because + // 1) a taskGroup can be moved from taskGroups to pendingCompletionTaskGroups in RunNotice + // (see checkTaskDuration()). + // 2) Notices are proceesed by a single thread. So, CheckpointNotice and RunNotice cannot be processed at the + // same time. + final java.util.Optional<Integer> maybeGroupId = taskGroups + .entrySet() + .stream() + .filter(entry -> { + final TaskGroup taskGroup = entry.getValue(); + return taskGroup.baseSequenceName.equals(baseSequenceName); + }) + .findAny() + .map(Entry::getKey); + taskGroupId = maybeGroupId.orElse( + pendingCompletionTaskGroups + .entrySet() + .stream() + .filter(entry -> { + final List<TaskGroup> taskGroups = entry.getValue(); + return taskGroups.stream().anyMatch(group -> group.baseSequenceName.equals(baseSequenceName)); + }) + .findAny() + .orElseThrow(() -> new ISE("Cannot find taskGroup for baseSequenceName[%s]", baseSequenceName)) + .getKey() + ); + } else { + taskGroupId = nullableTaskGroupId; + } + // check for consistency // if already received request for this sequenceName and dataSourceMetadata combination then return - final TaskGroup taskGroup = taskGroups.get(taskGroupId); - if (isValidTaskGroup(taskGroup)) { + if (isValidTaskGroup(taskGroupId, taskGroup)) { final TreeMap<Integer, Map<Integer, Long>> checkpoints = taskGroup.sequenceOffsets; // check validity of previousCheckpoint @@ -674,20 +718,13 @@ public void handle() throws ExecutionException, InterruptedException log.info("Already checkpointed with offsets [%s]", checkpoints.lastEntry().getValue()); return; } - final int taskGroupId = getTaskGroupIdForPartition( - currentCheckpoint.getKafkaPartitions() - .getPartitionOffsetMap() - .keySet() - .iterator() - .next() - ); final Map<Integer, Long> newCheckpoint = checkpointTaskGroup(taskGroupId, false).get(); taskGroups.get(taskGroupId).addNewCheckpoint(newCheckpoint); log.info("Handled checkpoint notice, new checkpoint is [%s] for taskGroup [%s]", newCheckpoint, taskGroupId); } } - private boolean isValidTaskGroup(@Nullable TaskGroup taskGroup) + private boolean isValidTaskGroup(int taskGroupId, @Nullable TaskGroup taskGroup) { if (taskGroup == null) { // taskGroup might be in pendingCompletionTaskGroups or partitionGroups @@ -886,17 +923,6 @@ String generateSequenceName( return Joiner.on("_").join("index_kafka", dataSource, hashCode); } - @VisibleForTesting - String generateSequenceName(TaskGroup taskGroup) - { - Preconditions.checkNotNull(taskGroup, "taskGroup cannot be null"); - return generateSequenceName( - taskGroup.partitionOffsets, - taskGroup.minimumMessageTime, - taskGroup.maximumMessageTime - ); - } - private static String getRandomId() { final StringBuilder suffix = new StringBuilder(8); @@ -1774,7 +1800,6 @@ private void createKafkaTasksForGroup(int groupId, int replicas) throws JsonProc endPartitions.put(partition, Long.MAX_VALUE); } TaskGroup group = taskGroups.get(groupId); - String sequenceName = generateSequenceName(group); Map<String, String> consumerProperties = Maps.newHashMap(ioConfig.getConsumerProperties()); DateTime minimumMessageTime = taskGroups.get(groupId).minimumMessageTime.orNull(); @@ -1782,7 +1807,7 @@ private void createKafkaTasksForGroup(int groupId, int replicas) throws JsonProc KafkaIOConfig kafkaIOConfig = new KafkaIOConfig( groupId, - sequenceName, + group.baseSequenceName, new KafkaPartitions(ioConfig.getTopic(), startPartitions), new KafkaPartitions(ioConfig.getTopic(), endPartitions), consumerProperties, @@ -1803,10 +1828,10 @@ private void createKafkaTasksForGroup(int groupId, int replicas) throws JsonProc .putAll(spec.getContext()) .build(); for (int i = 0; i < replicas; i++) { - String taskId = Joiner.on("_").join(sequenceName, getRandomId()); + String taskId = Joiner.on("_").join(group.baseSequenceName, getRandomId()); KafkaIndexTask indexTask = new KafkaIndexTask( taskId, - new TaskResource(sequenceName, 1), + new TaskResource(group.baseSequenceName, 1), spec.getDataSchema(), taskTuningConfig, kafkaIOConfig, @@ -1936,7 +1961,10 @@ private boolean isTaskCurrent(int taskGroupId, String taskId) String taskSequenceName = ((KafkaIndexTask) taskOptional.get()).getIOConfig().getBaseSequenceName(); if (taskGroups.get(taskGroupId) != null) { - return generateSequenceName(taskGroups.get(taskGroupId)).equals(taskSequenceName); + return Preconditions + .checkNotNull(taskGroups.get(taskGroupId), "null taskGroup for taskId[%s]", taskGroupId) + .baseSequenceName + .equals(taskSequenceName); } else { return generateSequenceName( ((KafkaIndexTask) taskOptional.get()).getIOConfig() diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java index 1929bb45132..c3cf1153671 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -2060,7 +2060,8 @@ private void makeToolboxFactory() throws IOException @Override public boolean checkPointDataSourceMetadata( String supervisorId, - int taskGroupId, + @Nullable Integer taskGroupId, + String baseSequenceName, @Nullable DataSourceMetadata previousDataSourceMetadata, @Nullable DataSourceMetadata currentDataSourceMetadata ) diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 36f3d733d6f..83e253ff9f1 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -2104,6 +2104,7 @@ public void testCheckpointForInactiveTaskGroup() supervisor.moveTaskGroupToPendingCompletion(0); supervisor.checkpoint( 0, + ((KafkaIndexTask) id1).getIOConfig().getBaseSequenceName(), new KafkaDataSourceMetadata(new KafkaPartitions(topic, checkpoints.get(0))), new KafkaDataSourceMetadata(new KafkaPartitions(topic, fakeCheckpoints)) ); @@ -2173,6 +2174,7 @@ public void testCheckpointForUnknownTaskGroup() throws InterruptedException supervisor.checkpoint( 0, + ((KafkaIndexTask) id1).getIOConfig().getBaseSequenceName(), new KafkaDataSourceMetadata(new KafkaPartitions(topic, Collections.emptyMap())), new KafkaDataSourceMetadata(new KafkaPartitions(topic, Collections.emptyMap())) ); @@ -2195,13 +2197,100 @@ public void testCheckpointForUnknownTaskGroup() throws InterruptedException Assert.assertEquals(ISE.class, serviceEmitter.getExceptionClass()); } + @Test(timeout = 60_000L) + public void testCheckpointWithNullTaskGroupId() + throws InterruptedException, ExecutionException, TimeoutException, JsonProcessingException + { + supervisor = getSupervisor(1, 3, true, "PT1S", null, null, false); + //not adding any events + final Task id1 = createKafkaIndexTask( + "id1", + DATASOURCE, + 0, + new KafkaPartitions(topic, ImmutableMap.of(0, 0L)), + new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE)), + null, + null + ); + + final Task id2 = createKafkaIndexTask( + "id2", + DATASOURCE, + 0, + new KafkaPartitions(topic, ImmutableMap.of(0, 0L)), + new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE)), + null, + null + ); + + final Task id3 = createKafkaIndexTask( + "id3", + DATASOURCE, + 0, + new KafkaPartitions(topic, ImmutableMap.of(0, 0L)), + new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE)), + null, + null + ); + + expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); + expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); + expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); + expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); + expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); + expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); + expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes(); + expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes(); + expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes(); + expect( + indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(new KafkaDataSourceMetadata(null) + ).anyTimes(); + taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); + expect(taskClient.getStatusAsync(anyString())) + .andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING)) + .anyTimes(); + final TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>(); + checkpoints.put(0, ImmutableMap.of(0, 0L)); + expect(taskClient.getCheckpointsAsync(anyString(), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints)) + .times(3); + expect(taskClient.getStartTimeAsync(anyString())).andReturn(Futures.immediateFuture(DateTimes.nowUtc())).anyTimes(); + expect(taskClient.pauseAsync(anyString())) + .andReturn(Futures.immediateFuture(ImmutableMap.of(0, 10L))) + .anyTimes(); + expect(taskClient.setEndOffsetsAsync(anyString(), EasyMock.eq(ImmutableMap.of(0, 10L)), anyBoolean())) + .andReturn(Futures.immediateFuture(true)) + .anyTimes(); + + replayAll(); + + supervisor.start(); + + supervisor.runInternal(); + + final TreeMap<Integer, Map<Integer, Long>> newCheckpoints = new TreeMap<>(); + newCheckpoints.put(0, ImmutableMap.of(0, 10L)); + supervisor.checkpoint( + null, + ((KafkaIndexTask) id1).getIOConfig().getBaseSequenceName(), + new KafkaDataSourceMetadata(new KafkaPartitions(topic, checkpoints.get(0))), + new KafkaDataSourceMetadata(new KafkaPartitions(topic, newCheckpoints.get(0))) + ); + + while (supervisor.getNoticesQueueSize() > 0) { + Thread.sleep(100); + } + + verifyAll(); + } + private void addSomeEvents(int numEventsPerPartition) throws Exception { try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) { for (int i = 0; i < NUM_PARTITIONS; i++) { for (int j = 0; j < numEventsPerPartition; j++) { kafkaProducer.send( - new ProducerRecord<byte[], byte[]>( + new ProducerRecord<>( topic, i, null, diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java index f1d11deb4ea..433af987be3 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java @@ -25,22 +25,29 @@ import io.druid.indexing.common.task.Task; import io.druid.indexing.overlord.DataSourceMetadata; +import javax.annotation.Nullable; + public class CheckPointDataSourceMetadataAction implements TaskAction<Boolean> { private final String supervisorId; - private final int taskGroupId; + @Nullable + private final Integer taskGroupId; + @Deprecated + private final String baseSequenceName; private final DataSourceMetadata previousCheckPoint; private final DataSourceMetadata currentCheckPoint; public CheckPointDataSourceMetadataAction( @JsonProperty("supervisorId") String supervisorId, - @JsonProperty("taskGroupId") Integer taskGroupId, + @JsonProperty("taskGroupId") @Nullable Integer taskGroupId, // nullable for backward compatibility, + @JsonProperty("sequenceName") @Deprecated String baseSequenceName, // old version would use this @JsonProperty("previousCheckPoint") DataSourceMetadata previousCheckPoint, @JsonProperty("currentCheckPoint") DataSourceMetadata currentCheckPoint ) { this.supervisorId = Preconditions.checkNotNull(supervisorId, "supervisorId"); - this.taskGroupId = Preconditions.checkNotNull(taskGroupId, "taskGroupId"); + this.taskGroupId = taskGroupId; + this.baseSequenceName = Preconditions.checkNotNull(baseSequenceName, "sequenceName"); this.previousCheckPoint = Preconditions.checkNotNull(previousCheckPoint, "previousCheckPoint"); this.currentCheckPoint = Preconditions.checkNotNull(currentCheckPoint, "currentCheckPoint"); } @@ -51,8 +58,16 @@ public String getSupervisorId() return supervisorId; } + @Deprecated + @JsonProperty("sequenceName") + public String getBaseSequenceName() + { + return baseSequenceName; + } + + @Nullable @JsonProperty - public int getTaskGroupId() + public Integer getTaskGroupId() { return taskGroupId; } @@ -85,6 +100,7 @@ public Boolean perform( return toolbox.getSupervisorManager().checkPointDataSourceMetadata( supervisorId, taskGroupId, + baseSequenceName, previousCheckPoint, currentCheckPoint ); @@ -101,6 +117,7 @@ public String toString() { return "CheckPointDataSourceMetadataAction{" + "supervisorId='" + supervisorId + '\'' + + ", baseSequenceName='" + baseSequenceName + '\'' + ", taskGroupId='" + taskGroupId + '\'' + ", previousCheckPoint=" + previousCheckPoint + ", currentCheckPoint=" + currentCheckPoint + diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java index 355465cec5d..03cc96f5b0b 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -165,7 +165,8 @@ public boolean resetSupervisor(String id, @Nullable DataSourceMetadata dataSourc public boolean checkPointDataSourceMetadata( String supervisorId, - int taskGroupId, + @Nullable Integer taskGroupId, + String baseSequenceName, DataSourceMetadata previousDataSourceMetadata, DataSourceMetadata currentDataSourceMetadata ) @@ -178,7 +179,7 @@ public boolean checkPointDataSourceMetadata( Preconditions.checkNotNull(supervisor, "supervisor could not be found"); - supervisor.lhs.checkpoint(taskGroupId, previousDataSourceMetadata, currentDataSourceMetadata); + supervisor.lhs.checkpoint(taskGroupId, baseSequenceName, previousDataSourceMetadata, currentDataSourceMetadata); return true; } catch (Exception e) { diff --git a/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java b/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java index 0408104cde8..ccf695e41a5 100644 --- a/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java +++ b/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java @@ -83,7 +83,8 @@ public void reset(DataSourceMetadata dataSourceMetadata) {} @Override public void checkpoint( - int taskGroupId, + @Nullable Integer taskGroupId, + String baseSequenceName, DataSourceMetadata previousCheckPoint, DataSourceMetadata currentCheckPoint ) diff --git a/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java index 04afac7aea6..6f3c05003a8 100644 --- a/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java +++ b/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableMap; import io.druid.indexing.overlord.DataSourceMetadata; +import javax.annotation.Nullable; import java.util.Map; public interface Supervisor @@ -52,8 +53,14 @@ * represented by {@param currentCheckpoint} DataSourceMetadata * * @param taskGroupId unique Identifier to figure out for which sequence to do checkpointing + * @param baseSequenceName baseSequenceName * @param previousCheckPoint DataSourceMetadata checkpointed in previous call * @param currentCheckPoint current DataSourceMetadata to be checkpointed */ - void checkpoint(int taskGroupId, DataSourceMetadata previousCheckPoint, DataSourceMetadata currentCheckPoint); + void checkpoint( + @Nullable Integer taskGroupId, + @Deprecated String baseSequenceName, + DataSourceMetadata previousCheckPoint, + DataSourceMetadata currentCheckPoint + ); } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org