fjy closed pull request #6155: Fix three bugs with segment publishing.
URL: https://github.com/apache/incubator-druid/pull/6155
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
 
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
index a93fde611c5..f8d2e10c338 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
@@ -1749,7 +1749,7 @@ TransactionalSegmentPublisher createPublisher(TaskToolbox 
toolbox, boolean useTr
 
         log.info("Publishing with isTransaction[%s].", useTransaction);
 
-        return toolbox.getTaskActionClient().submit(action).isSuccess();
+        return toolbox.getTaskActionClient().submit(action);
       };
     }
   }
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
 
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
index b7cdd3e96e4..a9dff63aefc 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
@@ -518,7 +518,7 @@ public void run()
 
         log.info("Publishing with isTransaction[%s].", 
ioConfig.isUseTransaction());
 
-        return toolbox.getTaskActionClient().submit(action).isSuccess();
+        return toolbox.getTaskActionClient().submit(action);
       };
 
       // Supervised kafka tasks are killed by KafkaSupervisor if they are 
stuck during publishing segments or waiting
diff --git 
a/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
 
b/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
index 9a0ea65175e..c04d7df646b 100644
--- 
a/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
+++ 
b/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
@@ -297,7 +297,7 @@ public TaskStatus run(final TaskToolbox toolbox) throws 
Exception
 
       final TransactionalSegmentPublisher publisher = (segments, 
commitMetadata) -> {
         final SegmentTransactionalInsertAction action = new 
SegmentTransactionalInsertAction(segments);
-        return toolbox.getTaskActionClient().submit(action).isSuccess();
+        return toolbox.getTaskActionClient().submit(action);
       };
 
       // Skip connecting firehose if we've been stopped before we got started.
diff --git 
a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java 
b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java
index 6d4cfba4fb8..4b14f68e461 100644
--- 
a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java
+++ 
b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java
@@ -949,7 +949,7 @@ private TaskStatus generateAndPublishSegments(
 
     final TransactionalSegmentPublisher publisher = (segments, commitMetadata) 
-> {
       final SegmentTransactionalInsertAction action = new 
SegmentTransactionalInsertAction(segments);
-      return toolbox.getTaskActionClient().submit(action).isSuccess();
+      return toolbox.getTaskActionClient().submit(action);
     };
 
     try (
diff --git 
a/indexing-service/src/main/java/io/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java
 
b/indexing-service/src/main/java/io/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java
index ae1addfe63e..8ad4ec28f00 100644
--- 
a/indexing-service/src/main/java/io/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java
+++ 
b/indexing-service/src/main/java/io/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java
@@ -382,7 +382,7 @@ private void publish(TaskToolbox toolbox) throws IOException
   {
     final TransactionalSegmentPublisher publisher = (segments, commitMetadata) 
-> {
       final SegmentTransactionalInsertAction action = new 
SegmentTransactionalInsertAction(segments);
-      return toolbox.getTaskActionClient().submit(action).isSuccess();
+      return toolbox.getTaskActionClient().submit(action);
     };
     final UsedSegmentChecker usedSegmentChecker = new 
ActionBasedUsedSegmentChecker(toolbox.getTaskActionClient());
     final Set<DataSegment> segmentsToPublish = segmentsMap
@@ -390,7 +390,7 @@ private void publish(TaskToolbox toolbox) throws IOException
         .stream()
         .flatMap(report -> report.getSegments().stream())
         .collect(Collectors.toSet());
-    final boolean published = publisher.publishSegments(segmentsToPublish, 
null);
+    final boolean published = publisher.publishSegments(segmentsToPublish, 
null).isSuccess();
 
     if (published) {
       log.info("Published segments");
diff --git 
a/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
 
b/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
index 0a7acaea159..a90ca26bbf5 100644
--- 
a/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
+++ 
b/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
@@ -50,13 +50,14 @@
 
   /**
    * Get all used segments and the created_date of these segments in a given 
datasource and interval
-   * 
+   *
    * @param dataSource The datasource to query
    * @param interval   The interval for which all applicable and used 
datasources are requested. Start is inclusive, end is exclusive
+   *
    * @return The DataSegments and the related created_date of segments which 
include data in the requested interval
    */
   List<Pair<DataSegment, String>> 
getUsedSegmentAndCreatedDateForInterval(String dataSource, Interval interval);
-  
+
   /**
    * Get all segments which may include any data in the interval and are 
flagged as used.
    *
@@ -134,9 +135,12 @@ SegmentIdentifier allocatePendingSegment(
    *                      {@link DataSourceMetadata#plus(DataSourceMetadata)}. 
If null, this insert will not
    *                      involve a metadata transaction
    *
-   * @return segment publish result indicating transaction success or failure, 
and set of segments actually published
+   * @return segment publish result indicating transaction success or failure, 
and set of segments actually published.
+   * This method must only return a failure code if it is sure that the 
transaction did not happen. If it is not sure,
+   * it must throw an exception instead.
    *
    * @throws IllegalArgumentException if startMetadata and endMetadata are not 
either both null or both non-null
+   * @throws RuntimeException         if the state of metadata storage after 
this call is unknown
    */
   SegmentPublishResult announceHistoricalSegments(
       Set<DataSegment> segments,
@@ -177,7 +181,7 @@ SegmentPublishResult announceHistoricalSegments(
    * @return true if the entry was inserted, false otherwise
    */
   boolean insertDataSourceMetadata(String dataSource, DataSourceMetadata 
dataSourceMetadata);
-  
+
   void updateSegmentMetadata(Set<DataSegment> segments);
 
   void deleteSegments(Set<DataSegment> segments);
diff --git 
a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
 
b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index 75e5d9a898f..82e17a13f4d 100644
--- 
a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++ 
b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -311,7 +311,7 @@ public SegmentPublishResult announceHistoricalSegments(
       }
     }
 
-    final AtomicBoolean txnFailure = new AtomicBoolean(false);
+    final AtomicBoolean definitelyNotUpdated = new AtomicBoolean(false);
 
     try {
       return connector.retryTransaction(
@@ -323,6 +323,9 @@ public SegmentPublishResult inTransaction(
                 final TransactionStatus transactionStatus
             ) throws Exception
             {
+              // Set definitelyNotUpdated back to false upon retrying.
+              definitelyNotUpdated.set(false);
+
               final Set<DataSegment> inserted = Sets.newHashSet();
 
               if (startMetadata != null) {
@@ -334,8 +337,9 @@ public SegmentPublishResult inTransaction(
                 );
 
                 if (result != DataSourceMetadataUpdateResult.SUCCESS) {
+                  // Metadata was definitely not updated.
                   transactionStatus.setRollbackOnly();
-                  txnFailure.set(true);
+                  definitelyNotUpdated.set(true);
 
                   if (result == DataSourceMetadataUpdateResult.FAILURE) {
                     throw new RuntimeException("Aborting transaction!");
@@ -359,9 +363,10 @@ public SegmentPublishResult inTransaction(
       );
     }
     catch (CallbackFailedException e) {
-      if (txnFailure.get()) {
-        return new SegmentPublishResult(ImmutableSet.of(), false);
+      if (definitelyNotUpdated.get()) {
+        return SegmentPublishResult.fail();
       } else {
+        // Must throw exception if we are not sure if we updated or not.
         throw e;
       }
     }
@@ -890,7 +895,12 @@ public DataSourceMetadata getDataSourceMetadata(final 
String dataSource)
    * @param endMetadata   dataSource metadata post-insert will have this 
endMetadata merged in with
    *                      {@link DataSourceMetadata#plus(DataSourceMetadata)}
    *
-   * @return true if dataSource metadata was updated from matching 
startMetadata to matching endMetadata
+   * @return SUCCESS if dataSource metadata was updated from matching 
startMetadata to matching endMetadata, FAILURE or
+   * TRY_AGAIN if it definitely was not updated. This guarantee is meant to 
help
+   * {@link #announceHistoricalSegments(Set, DataSourceMetadata, 
DataSourceMetadata)}
+   * achieve its own guarantee.
+   *
+   * @throws RuntimeException if state is unknown after this call
    */
   protected DataSourceMetadataUpdateResult updateDataSourceMetadataWithHandle(
       final Handle handle,
@@ -1163,29 +1173,31 @@ private void updatePayload(final Handle handle, final 
DataSegment segment) throw
         handle -> handle.createQuery(
             StringUtils.format(
                 "SELECT created_date, payload FROM %1$s WHERE dataSource = 
:dataSource " +
-                    "AND start >= :start AND %2$send%2$s <= :end AND used = 
true",
+                "AND start >= :start AND %2$send%2$s <= :end AND used = true",
                 dbTables.getSegmentsTable(), connector.getQuoteString()
             )
         )
-            .bind("dataSource", dataSource)
-            .bind("start", interval.getStart().toString())
-            .bind("end", interval.getEnd().toString())
-            .map(new ResultSetMapper<Pair<DataSegment, String>>()
-            {
-              @Override
-              public Pair<DataSegment, String> map(int index, ResultSet r, 
StatementContext ctx) throws SQLException
-              {
-                try {
-                  return new Pair<>(
-                      jsonMapper.readValue(r.getBytes("payload"), 
DataSegment.class),
-                      r.getString("created_date"));
-                }
-                catch (IOException e) {
-                  throw new RuntimeException(e);
-                }
-              }
-            })
-            .list()
+                        .bind("dataSource", dataSource)
+                        .bind("start", interval.getStart().toString())
+                        .bind("end", interval.getEnd().toString())
+                        .map(new ResultSetMapper<Pair<DataSegment, String>>()
+                        {
+                          @Override
+                          public Pair<DataSegment, String> map(int index, 
ResultSet r, StatementContext ctx)
+                              throws SQLException
+                          {
+                            try {
+                              return new Pair<>(
+                                  jsonMapper.readValue(r.getBytes("payload"), 
DataSegment.class),
+                                  r.getString("created_date")
+                              );
+                            }
+                            catch (IOException e) {
+                              throw new RuntimeException(e);
+                            }
+                          }
+                        })
+                        .list()
     );
   }
 
@@ -1197,7 +1209,7 @@ public boolean insertDataSourceMetadata(String 
dataSource, DataSourceMetadata me
             .createStatement(
                 StringUtils.format(
                     "INSERT INTO %s (dataSource, created_date, 
commit_metadata_payload, commit_metadata_sha1) VALUES" +
-                        " (:dataSource, :created_date, 
:commit_metadata_payload, :commit_metadata_sha1)",
+                    " (:dataSource, :created_date, :commit_metadata_payload, 
:commit_metadata_sha1)",
                     dbTables.getDataSourceTable()
                 )
             )
diff --git 
a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java
 
b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java
index 5b79626880d..4bb0857843b 100644
--- 
a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java
+++ 
b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java
@@ -691,8 +691,15 @@ private DataSegment mergeAndPush(final SegmentIdentifier 
identifier, final Sink
     try {
       if (descriptorFile.exists()) {
         // Already pushed.
-        log.info("Segment[%s] already pushed.", identifier);
-        return objectMapper.readValue(descriptorFile, DataSegment.class);
+
+        if (useUniquePath) {
+          // Don't reuse the descriptor, because the caller asked for a unique 
path. Leave the old one as-is, since
+          // it might serve some unknown purpose.
+          log.info("Pushing segment[%s] again with new unique path.", 
identifier);
+        } else {
+          log.info("Segment[%s] already pushed.", identifier);
+          return objectMapper.readValue(descriptorFile, DataSegment.class);
+        }
       }
 
       log.info("Pushing merged index for segment[%s].", identifier);
diff --git 
a/server/src/main/java/io/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
 
b/server/src/main/java/io/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
index a5a6d2071d2..43ec063c394 100644
--- 
a/server/src/main/java/io/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
+++ 
b/server/src/main/java/io/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
@@ -555,38 +555,33 @@ protected AppenderatorDriverAddResult append(
               final boolean published = publisher.publishSegments(
                   ImmutableSet.copyOf(segmentsAndMetadata.getSegments()),
                   metadata == null ? null : ((AppenderatorDriverMetadata) 
metadata).getCallerMetadata()
-              );
+              ).isSuccess();
 
               if (published) {
                 log.info("Published segments.");
               } else {
-                log.info("Transaction failure while publishing segments, 
checking if someone else beat us to it.");
+                log.info("Transaction failure while publishing segments, 
removing them from deep storage "
+                         + "and checking if someone else beat us to 
publishing.");
+
+                
segmentsAndMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);
+
                 final Set<SegmentIdentifier> segmentsIdentifiers = 
segmentsAndMetadata
                     .getSegments()
                     .stream()
                     .map(SegmentIdentifier::fromDataSegment)
                     .collect(Collectors.toSet());
+
                 if (usedSegmentChecker.findUsedSegments(segmentsIdentifiers)
                                       
.equals(Sets.newHashSet(segmentsAndMetadata.getSegments()))) {
-                  log.info(
-                      "Removing our segments from deep storage because someone 
else already published them: %s",
-                      segmentsAndMetadata.getSegments()
-                  );
-                  
segmentsAndMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);
-
                   log.info("Our segments really do exist, awaiting handoff.");
                 } else {
-                  throw new ISE("Failed to publish segments[%s]", 
segmentsAndMetadata.getSegments());
+                  throw new ISE("Failed to publish segments.");
                 }
               }
             }
             catch (Exception e) {
-              log.warn(
-                  "Removing segments from deep storage after failed publish: 
%s",
-                  segmentsAndMetadata.getSegments()
-              );
-              
segmentsAndMetadata.getSegments().forEach(dataSegmentKiller::killQuietly);
-
+              // Must not remove segments here, we aren't sure if our 
transaction succeeded or not.
+              log.warn(e, "Failed publish, not removing segments: %s", 
segmentsAndMetadata.getSegments());
               throw Throwables.propagate(e);
             }
           }
diff --git 
a/server/src/main/java/io/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java
 
b/server/src/main/java/io/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java
index d49b1816211..9a30fbd2fcf 100644
--- 
a/server/src/main/java/io/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java
+++ 
b/server/src/main/java/io/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java
@@ -19,6 +19,7 @@
 
 package io.druid.segment.realtime.appenderator;
 
+import io.druid.indexing.overlord.SegmentPublishResult;
 import io.druid.timeline.DataSegment;
 
 import javax.annotation.Nullable;
@@ -30,11 +31,14 @@
   /**
    * Publish segments, along with some commit metadata, in a single 
transaction.
    *
-   * @return true if segments were published, false if they were not published 
due to txn failure with the metadata
+   * @return publish result that indicates if segments were published or not. 
If it is unclear
+   * if the segments were published or not, this method must throw an 
exception. The behavior is similar to
+   * IndexerSQLMetadataStorageCoordinator's announceHistoricalSegments.
    *
    * @throws IOException if there was an I/O error when publishing
+   * @throws RuntimeException if we cannot tell if the segments were published 
or not, for some other reason
    */
-  boolean publishSegments(
+  SegmentPublishResult publishSegments(
       Set<DataSegment> segments,
       @Nullable Object commitMetadata
   ) throws IOException;
diff --git 
a/server/src/test/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java
 
b/server/src/test/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java
index 4ac34483176..c472f801621 100644
--- 
a/server/src/test/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java
+++ 
b/server/src/test/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java
@@ -24,13 +24,14 @@
 import com.google.common.collect.ImmutableSet;
 import io.druid.data.input.InputRow;
 import io.druid.data.input.MapBasedInputRow;
+import io.druid.indexing.overlord.SegmentPublishResult;
 import io.druid.java.util.common.DateTimes;
 import io.druid.java.util.common.Intervals;
 import io.druid.java.util.common.granularity.Granularities;
 import io.druid.segment.loading.DataSegmentKiller;
 import 
io.druid.segment.realtime.appenderator.BaseAppenderatorDriver.SegmentsForSequence;
-import 
io.druid.segment.realtime.appenderator.StreamAppenderatorDriverTest.TestSegmentAllocator;
 import io.druid.segment.realtime.appenderator.SegmentWithState.SegmentState;
+import 
io.druid.segment.realtime.appenderator.StreamAppenderatorDriverTest.TestSegmentAllocator;
 import io.druid.timeline.partition.NumberedShardSpec;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockSupport;
@@ -194,6 +195,6 @@ private void checkSegmentStates(int 
expectedNumSegmentsInState, SegmentState exp
 
   static TransactionalSegmentPublisher makeOkPublisher()
   {
-    return (segments, commitMetadata) -> true;
+    return (segments, commitMetadata) -> new 
SegmentPublishResult(ImmutableSet.of(), true);
   }
 }
diff --git 
a/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
 
b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
index 4d512e11080..1c7abb1f84e 100644
--- 
a/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
+++ 
b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java
@@ -239,8 +239,7 @@ public void testFailDuringPublish() throws Exception
   {
     expectedException.expect(ExecutionException.class);
     expectedException.expectCause(CoreMatchers.instanceOf(ISE.class));
-    expectedException.expectMessage(
-        "Failed to publish segments[[DataSegment{size=0, 
shardSpec=NumberedShardSpec{partitionNum=0, partitions=0}, metrics=[], 
dimensions=[], version='abc123', loadSpec={}, 
interval=2000-01-01T00:00:00.000Z/2000-01-01T01:00:00.000Z, dataSource='foo', 
binaryVersion='0'}, DataSegment{size=0, 
shardSpec=NumberedShardSpec{partitionNum=0, partitions=0}, metrics=[], 
dimensions=[], version='abc123', loadSpec={}, 
interval=2000-01-01T01:00:00.000Z/2000-01-01T02:00:00.000Z, dataSource='foo', 
binaryVersion='0'}]]");
+    expectedException.expectMessage("Failed to publish segments.");
 
     testFailDuringPublishInternal(false);
   }
@@ -279,31 +278,34 @@ private void testFailDuringPublishInternal(boolean 
failWithException) throws Exc
       Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier, 
false, true).isOk());
     }
 
-    dataSegmentKiller.killQuietly(new DataSegment(
-        "foo",
-        Intervals.of("2000-01-01T00:00:00.000Z/2000-01-01T01:00:00.000Z"),
-        "abc123",
-        ImmutableMap.of(),
-        ImmutableList.of(),
-        ImmutableList.of(),
-        new NumberedShardSpec(0, 0),
-        0,
-        0
-    ));
-    EasyMock.expectLastCall().once();
-
-    dataSegmentKiller.killQuietly(new DataSegment(
-        "foo",
-        Intervals.of("2000-01-01T01:00:00.000Z/2000-01-01T02:00:00.000Z"),
-        "abc123",
-        ImmutableMap.of(),
-        ImmutableList.of(),
-        ImmutableList.of(),
-        new NumberedShardSpec(0, 0),
-        0,
-        0
-    ));
-    EasyMock.expectLastCall().once();
+    if (!failWithException) {
+      // Should only kill segments if there was _no_ exception.
+      dataSegmentKiller.killQuietly(new DataSegment(
+          "foo",
+          Intervals.of("2000-01-01T00:00:00.000Z/2000-01-01T01:00:00.000Z"),
+          "abc123",
+          ImmutableMap.of(),
+          ImmutableList.of(),
+          ImmutableList.of(),
+          new NumberedShardSpec(0, 0),
+          0,
+          0
+      ));
+      EasyMock.expectLastCall().once();
+
+      dataSegmentKiller.killQuietly(new DataSegment(
+          "foo",
+          Intervals.of("2000-01-01T01:00:00.000Z/2000-01-01T02:00:00.000Z"),
+          "abc123",
+          ImmutableMap.of(),
+          ImmutableList.of(),
+          ImmutableList.of(),
+          new NumberedShardSpec(0, 0),
+          0,
+          0
+      ));
+      EasyMock.expectLastCall().once();
+    }
 
     EasyMock.replay(dataSegmentKiller);
 
diff --git 
a/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
 
b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
index acdbed52d69..b9c5e223438 100644
--- 
a/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
+++ 
b/server/src/test/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
@@ -30,6 +30,7 @@
 import io.druid.data.input.Committer;
 import io.druid.data.input.InputRow;
 import io.druid.data.input.MapBasedInputRow;
+import io.druid.indexing.overlord.SegmentPublishResult;
 import io.druid.jackson.DefaultObjectMapper;
 import io.druid.java.util.common.DateTimes;
 import io.druid.java.util.common.Intervals;
@@ -53,6 +54,7 @@
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -359,7 +361,7 @@ public void testIncrementalHandoff() throws Exception
 
   static TransactionalSegmentPublisher makeOkPublisher()
   {
-    return (segments, commitMetadata) -> true;
+    return (segments, commitMetadata) -> new 
SegmentPublishResult(Collections.emptySet(), true);
   }
 
   static TransactionalSegmentPublisher makeFailingPublisher(boolean 
failWithException)
@@ -368,7 +370,7 @@ static TransactionalSegmentPublisher 
makeFailingPublisher(boolean failWithExcept
       if (failWithException) {
         throw new RuntimeException("test");
       }
-      return false;
+      return SegmentPublishResult.fail();
     };
   }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to