This is an automated email from the ASF dual-hosted git repository.

fjy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new e56c598  Fall back to the old coordinator API for checking segment 
handoff if new one is not supported (#6966)
e56c598 is described below

commit e56c598cc118a3a00b21675a8e8bed6c230e6fd7
Author: Jihoon Son <jihoon...@apache.org>
AuthorDate: Thu Jan 31 08:50:46 2019 -0800

    Fall back to the old coordinator API for checking segment handoff if new 
one is not supported (#6966)
---
 .../client/coordinator/CoordinatorClient.java      |  50 +++++-
 .../CoordinatorBasedSegmentHandoffNotifier.java    |  31 +++-
 ...CoordinatorBasedSegmentHandoffNotifierTest.java | 180 +++++++++++++++++++++
 3 files changed, 257 insertions(+), 4 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java
 
b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java
index 631c808..2a197cf 100644
--- 
a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java
+++ 
b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java
@@ -21,8 +21,8 @@ package org.apache.druid.client.coordinator;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Throwables;
 import com.google.inject.Inject;
+import org.apache.druid.client.ImmutableSegmentLoadInfo;
 import org.apache.druid.discovery.DruidLeaderClient;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
@@ -30,6 +30,10 @@ import 
org.apache.druid.java.util.http.client.response.FullResponseHolder;
 import org.apache.druid.query.SegmentDescriptor;
 import org.jboss.netty.handler.codec.http.HttpMethod;
 import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.List;
 
 public class CoordinatorClient
 {
@@ -46,7 +50,12 @@ public class CoordinatorClient
     this.druidLeaderClient = druidLeaderClient;
   }
 
-  public boolean isHandOffComplete(String dataSource, SegmentDescriptor 
descriptor)
+  /**
+   * Checks the given segment is handed off or not.
+   * It can return null if the HTTP call returns 404 which can happen during 
rolling update.
+   */
+  @Nullable
+  public Boolean isHandOffComplete(String dataSource, SegmentDescriptor 
descriptor)
   {
     try {
       FullResponseHolder response = druidLeaderClient.go(
@@ -62,6 +71,10 @@ public class CoordinatorClient
           )
       );
 
+      if (response.getStatus().equals(HttpResponseStatus.NOT_FOUND)) {
+        return null;
+      }
+
       if (!response.getStatus().equals(HttpResponseStatus.OK)) {
         throw new ISE(
             "Error while fetching serverView status[%s] content[%s]",
@@ -74,7 +87,38 @@ public class CoordinatorClient
       });
     }
     catch (Exception e) {
-      throw Throwables.propagate(e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  public List<ImmutableSegmentLoadInfo> fetchServerView(String dataSource, 
Interval interval, boolean incompleteOk)
+  {
+    try {
+      FullResponseHolder response = druidLeaderClient.go(
+          druidLeaderClient.makeRequest(HttpMethod.GET,
+                                        StringUtils.format(
+                                            
"/druid/coordinator/v1/datasources/%s/intervals/%s/serverview?partial=%s",
+                                            dataSource,
+                                            interval.toString().replace('/', 
'_'),
+                                            incompleteOk
+                                        ))
+      );
+
+      if (!response.getStatus().equals(HttpResponseStatus.OK)) {
+        throw new ISE(
+            "Error while fetching serverView status[%s] content[%s]",
+            response.getStatus(),
+            response.getContent()
+        );
+      }
+      return jsonMapper.readValue(
+          response.getContent(), new 
TypeReference<List<ImmutableSegmentLoadInfo>>()
+          {
+          }
+      );
+    }
+    catch (Exception e) {
+      throw new RuntimeException(e);
     }
   }
 }
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifier.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifier.java
index 028183f..bbee720 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifier.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifier.java
@@ -19,13 +19,16 @@
 
 package org.apache.druid.segment.realtime.plumber;
 
+import org.apache.druid.client.ImmutableSegmentLoadInfo;
 import org.apache.druid.client.coordinator.CoordinatorClient;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.server.coordination.DruidServerMetadata;
 
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -90,7 +93,19 @@ public class CoordinatorBasedSegmentHandoffNotifier 
implements SegmentHandoffNot
         Map.Entry<SegmentDescriptor, Pair<Executor, Runnable>> entry = 
itr.next();
         SegmentDescriptor descriptor = entry.getKey();
         try {
-          if (coordinatorClient.isHandOffComplete(dataSource, descriptor)) {
+          Boolean handOffComplete = 
coordinatorClient.isHandOffComplete(dataSource, descriptor);
+          if (handOffComplete == null) {
+            log.warn(
+                "Failed to call the new coordinator API for checking segment 
handoff. Falling back to the old API"
+            );
+            final List<ImmutableSegmentLoadInfo> loadedSegments = 
coordinatorClient.fetchServerView(
+                dataSource,
+                descriptor.getInterval(),
+                true
+            );
+            handOffComplete = isHandOffComplete(loadedSegments, descriptor);
+          }
+          if (handOffComplete) {
             log.info("Segment Handoff complete for dataSource[%s] 
Segment[%s]", dataSource, descriptor);
             entry.getValue().lhs.execute(entry.getValue().rhs);
             itr.remove();
@@ -120,6 +135,20 @@ public class CoordinatorBasedSegmentHandoffNotifier 
implements SegmentHandoffNot
     }
   }
 
+  static boolean isHandOffComplete(List<ImmutableSegmentLoadInfo> serverView, 
SegmentDescriptor descriptor)
+  {
+    for (ImmutableSegmentLoadInfo segmentLoadInfo : serverView) {
+      if 
(segmentLoadInfo.getSegment().getInterval().contains(descriptor.getInterval())
+          && segmentLoadInfo.getSegment().getShardSpec().getPartitionNum()
+             == descriptor.getPartitionNumber()
+          && 
segmentLoadInfo.getSegment().getVersion().compareTo(descriptor.getVersion()) >= 0
+          && 
segmentLoadInfo.getServers().stream().anyMatch(DruidServerMetadata::segmentReplicatable))
 {
+        return true;
+      }
+    }
+    return false;
+  }
+
   @Override
   public void close()
   {
diff --git 
a/server/src/test/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierTest.java
 
b/server/src/test/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierTest.java
index 3fcac21..4715734 100644
--- 
a/server/src/test/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierTest.java
@@ -19,16 +19,23 @@
 
 package org.apache.druid.segment.realtime.plumber;
 
+import com.google.common.collect.Sets;
+import org.apache.druid.client.ImmutableSegmentLoadInfo;
 import org.apache.druid.client.coordinator.CoordinatorClient;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.server.coordination.DruidServerMetadata;
+import org.apache.druid.server.coordination.ServerType;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
 import org.easymock.EasyMock;
 import org.joda.time.Duration;
 import org.joda.time.Interval;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Collections;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 public class CoordinatorBasedSegmentHandoffNotifierTest
@@ -104,4 +111,177 @@ public class CoordinatorBasedSegmentHandoffNotifierTest
     Assert.assertTrue(callbackCalled.get());
     EasyMock.verify(coordinatorClient);
   }
+
+  @Test
+  public void testHandoffChecksForVersion()
+  {
+    Interval interval = Intervals.of(
+        "2011-04-01/2011-04-02"
+    );
+    Assert.assertFalse(
+        CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
+            Collections.singletonList(
+                new ImmutableSegmentLoadInfo(
+                    createSegment(interval, "v1", 2),
+                    Sets.newHashSet(createHistoricalServerMetadata("a"))
+                )
+            ),
+            new SegmentDescriptor(interval, "v2", 2)
+        )
+    );
+
+    Assert.assertTrue(
+        CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
+            Collections.singletonList(
+                new ImmutableSegmentLoadInfo(
+                    createSegment(interval, "v2", 2),
+                    Sets.newHashSet(createHistoricalServerMetadata("a"))
+                )
+            ),
+            new SegmentDescriptor(interval, "v1", 2)
+        )
+    );
+
+    Assert.assertTrue(
+        CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
+            Collections.singletonList(
+                new ImmutableSegmentLoadInfo(
+                    createSegment(interval, "v1", 2),
+                    Sets.newHashSet(createHistoricalServerMetadata("a"))
+                )
+            ),
+            new SegmentDescriptor(interval, "v1", 2)
+        )
+    );
+
+  }
+
+  @Test
+  public void testHandoffChecksForAssignableServer()
+  {
+    Interval interval = Intervals.of(
+        "2011-04-01/2011-04-02"
+    );
+    Assert.assertTrue(
+        CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
+            Collections.singletonList(
+                new ImmutableSegmentLoadInfo(
+                    createSegment(interval, "v1", 2),
+                    Sets.newHashSet(createHistoricalServerMetadata("a"))
+                )
+            ),
+            new SegmentDescriptor(interval, "v1", 2)
+        )
+    );
+
+    Assert.assertFalse(
+        CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
+            Collections.singletonList(
+                new ImmutableSegmentLoadInfo(
+                    createSegment(interval, "v1", 2),
+                    Sets.newHashSet(createRealtimeServerMetadata("a"))
+                )
+            ),
+            new SegmentDescriptor(interval, "v1", 2)
+        )
+    );
+  }
+
+  @Test
+  public void testHandoffChecksForPartitionNumber()
+  {
+    Interval interval = Intervals.of(
+        "2011-04-01/2011-04-02"
+    );
+    Assert.assertTrue(
+        CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
+            Collections.singletonList(
+                new ImmutableSegmentLoadInfo(
+                    createSegment(interval, "v1", 1),
+                    Sets.newHashSet(createHistoricalServerMetadata("a"))
+                )
+            ),
+            new SegmentDescriptor(interval, "v1", 1)
+        )
+    );
+
+    Assert.assertFalse(
+        CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
+            Collections.singletonList(
+                new ImmutableSegmentLoadInfo(
+                    createSegment(interval, "v1", 1),
+                    Sets.newHashSet(createHistoricalServerMetadata("a"))
+                )
+            ),
+            new SegmentDescriptor(interval, "v1", 2)
+        )
+    );
+
+  }
+
+  @Test
+  public void testHandoffChecksForInterval()
+  {
+
+    Assert.assertFalse(
+        CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
+            Collections.singletonList(
+                new ImmutableSegmentLoadInfo(
+                    createSegment(Intervals.of("2011-04-01/2011-04-02"), "v1", 
1),
+                    Sets.newHashSet(createHistoricalServerMetadata("a"))
+                )
+            ),
+            new SegmentDescriptor(Intervals.of("2011-04-01/2011-04-03"), "v1", 
1)
+        )
+    );
+
+    Assert.assertTrue(
+        CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete(
+            Collections.singletonList(
+                new ImmutableSegmentLoadInfo(
+                    createSegment(Intervals.of("2011-04-01/2011-04-04"), "v1", 
1),
+                    Sets.newHashSet(createHistoricalServerMetadata("a"))
+                )
+            ),
+            new SegmentDescriptor(Intervals.of("2011-04-02/2011-04-03"), "v1", 
1)
+        )
+    );
+  }
+
+  private DruidServerMetadata createRealtimeServerMetadata(String name)
+  {
+    return createServerMetadata(name, ServerType.REALTIME);
+  }
+
+  private DruidServerMetadata createHistoricalServerMetadata(String name)
+  {
+    return createServerMetadata(name, ServerType.HISTORICAL);
+  }
+
+  private DruidServerMetadata createServerMetadata(String name, ServerType 
type)
+  {
+    return new DruidServerMetadata(
+        name,
+        name,
+        null,
+        10000,
+        type,
+        "tier",
+        1
+    );
+  }
+
+  private DataSegment createSegment(Interval interval, String version, int 
partitionNumber)
+  {
+    return new DataSegment(
+        "test_ds",
+        interval,
+        version,
+        null,
+        null,
+        null,
+        new NumberedShardSpec(partitionNumber, 100),
+        0, 0
+    );
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to