This is an automated email from the ASF dual-hosted git repository. fjy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push: new e56c598 Fall back to the old coordinator API for checking segment handoff if new one is not supported (#6966) e56c598 is described below commit e56c598cc118a3a00b21675a8e8bed6c230e6fd7 Author: Jihoon Son <jihoon...@apache.org> AuthorDate: Thu Jan 31 08:50:46 2019 -0800 Fall back to the old coordinator API for checking segment handoff if new one is not supported (#6966) --- .../client/coordinator/CoordinatorClient.java | 50 +++++- .../CoordinatorBasedSegmentHandoffNotifier.java | 31 +++- ...CoordinatorBasedSegmentHandoffNotifierTest.java | 180 +++++++++++++++++++++ 3 files changed, 257 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java index 631c808..2a197cf 100644 --- a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java +++ b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java @@ -21,8 +21,8 @@ package org.apache.druid.client.coordinator; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Throwables; import com.google.inject.Inject; +import org.apache.druid.client.ImmutableSegmentLoadInfo; import org.apache.druid.discovery.DruidLeaderClient; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; @@ -30,6 +30,10 @@ import org.apache.druid.java.util.http.client.response.FullResponseHolder; import org.apache.druid.query.SegmentDescriptor; import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.util.List; public class CoordinatorClient { @@ -46,7 +50,12 @@ public class CoordinatorClient this.druidLeaderClient = druidLeaderClient; } - public boolean isHandOffComplete(String dataSource, SegmentDescriptor descriptor) + /** + * Checks the given segment is handed off or not. + * It can return null if the HTTP call returns 404 which can happen during rolling update. + */ + @Nullable + public Boolean isHandOffComplete(String dataSource, SegmentDescriptor descriptor) { try { FullResponseHolder response = druidLeaderClient.go( @@ -62,6 +71,10 @@ public class CoordinatorClient ) ); + if (response.getStatus().equals(HttpResponseStatus.NOT_FOUND)) { + return null; + } + if (!response.getStatus().equals(HttpResponseStatus.OK)) { throw new ISE( "Error while fetching serverView status[%s] content[%s]", @@ -74,7 +87,38 @@ public class CoordinatorClient }); } catch (Exception e) { - throw Throwables.propagate(e); + throw new RuntimeException(e); + } + } + + public List<ImmutableSegmentLoadInfo> fetchServerView(String dataSource, Interval interval, boolean incompleteOk) + { + try { + FullResponseHolder response = druidLeaderClient.go( + druidLeaderClient.makeRequest(HttpMethod.GET, + StringUtils.format( + "/druid/coordinator/v1/datasources/%s/intervals/%s/serverview?partial=%s", + dataSource, + interval.toString().replace('/', '_'), + incompleteOk + )) + ); + + if (!response.getStatus().equals(HttpResponseStatus.OK)) { + throw new ISE( + "Error while fetching serverView status[%s] content[%s]", + response.getStatus(), + response.getContent() + ); + } + return jsonMapper.readValue( + response.getContent(), new TypeReference<List<ImmutableSegmentLoadInfo>>() + { + } + ); + } + catch (Exception e) { + throw new RuntimeException(e); } } } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifier.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifier.java index 028183f..bbee720 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifier.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifier.java @@ -19,13 +19,16 @@ package org.apache.druid.segment.realtime.plumber; +import org.apache.druid.client.ImmutableSegmentLoadInfo; import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.SegmentDescriptor; +import org.apache.druid.server.coordination.DruidServerMetadata; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -90,7 +93,19 @@ public class CoordinatorBasedSegmentHandoffNotifier implements SegmentHandoffNot Map.Entry<SegmentDescriptor, Pair<Executor, Runnable>> entry = itr.next(); SegmentDescriptor descriptor = entry.getKey(); try { - if (coordinatorClient.isHandOffComplete(dataSource, descriptor)) { + Boolean handOffComplete = coordinatorClient.isHandOffComplete(dataSource, descriptor); + if (handOffComplete == null) { + log.warn( + "Failed to call the new coordinator API for checking segment handoff. Falling back to the old API" + ); + final List<ImmutableSegmentLoadInfo> loadedSegments = coordinatorClient.fetchServerView( + dataSource, + descriptor.getInterval(), + true + ); + handOffComplete = isHandOffComplete(loadedSegments, descriptor); + } + if (handOffComplete) { log.info("Segment Handoff complete for dataSource[%s] Segment[%s]", dataSource, descriptor); entry.getValue().lhs.execute(entry.getValue().rhs); itr.remove(); @@ -120,6 +135,20 @@ public class CoordinatorBasedSegmentHandoffNotifier implements SegmentHandoffNot } } + static boolean isHandOffComplete(List<ImmutableSegmentLoadInfo> serverView, SegmentDescriptor descriptor) + { + for (ImmutableSegmentLoadInfo segmentLoadInfo : serverView) { + if (segmentLoadInfo.getSegment().getInterval().contains(descriptor.getInterval()) + && segmentLoadInfo.getSegment().getShardSpec().getPartitionNum() + == descriptor.getPartitionNumber() + && segmentLoadInfo.getSegment().getVersion().compareTo(descriptor.getVersion()) >= 0 + && segmentLoadInfo.getServers().stream().anyMatch(DruidServerMetadata::segmentReplicatable)) { + return true; + } + } + return false; + } + @Override public void close() { diff --git a/server/src/test/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierTest.java b/server/src/test/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierTest.java index 3fcac21..4715734 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/plumber/CoordinatorBasedSegmentHandoffNotifierTest.java @@ -19,16 +19,23 @@ package org.apache.druid.segment.realtime.plumber; +import com.google.common.collect.Sets; +import org.apache.druid.client.ImmutableSegmentLoadInfo; import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.query.SegmentDescriptor; +import org.apache.druid.server.coordination.DruidServerMetadata; +import org.apache.druid.server.coordination.ServerType; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.NumberedShardSpec; import org.easymock.EasyMock; import org.joda.time.Duration; import org.joda.time.Interval; import org.junit.Assert; import org.junit.Test; +import java.util.Collections; import java.util.concurrent.atomic.AtomicBoolean; public class CoordinatorBasedSegmentHandoffNotifierTest @@ -104,4 +111,177 @@ public class CoordinatorBasedSegmentHandoffNotifierTest Assert.assertTrue(callbackCalled.get()); EasyMock.verify(coordinatorClient); } + + @Test + public void testHandoffChecksForVersion() + { + Interval interval = Intervals.of( + "2011-04-01/2011-04-02" + ); + Assert.assertFalse( + CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete( + Collections.singletonList( + new ImmutableSegmentLoadInfo( + createSegment(interval, "v1", 2), + Sets.newHashSet(createHistoricalServerMetadata("a")) + ) + ), + new SegmentDescriptor(interval, "v2", 2) + ) + ); + + Assert.assertTrue( + CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete( + Collections.singletonList( + new ImmutableSegmentLoadInfo( + createSegment(interval, "v2", 2), + Sets.newHashSet(createHistoricalServerMetadata("a")) + ) + ), + new SegmentDescriptor(interval, "v1", 2) + ) + ); + + Assert.assertTrue( + CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete( + Collections.singletonList( + new ImmutableSegmentLoadInfo( + createSegment(interval, "v1", 2), + Sets.newHashSet(createHistoricalServerMetadata("a")) + ) + ), + new SegmentDescriptor(interval, "v1", 2) + ) + ); + + } + + @Test + public void testHandoffChecksForAssignableServer() + { + Interval interval = Intervals.of( + "2011-04-01/2011-04-02" + ); + Assert.assertTrue( + CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete( + Collections.singletonList( + new ImmutableSegmentLoadInfo( + createSegment(interval, "v1", 2), + Sets.newHashSet(createHistoricalServerMetadata("a")) + ) + ), + new SegmentDescriptor(interval, "v1", 2) + ) + ); + + Assert.assertFalse( + CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete( + Collections.singletonList( + new ImmutableSegmentLoadInfo( + createSegment(interval, "v1", 2), + Sets.newHashSet(createRealtimeServerMetadata("a")) + ) + ), + new SegmentDescriptor(interval, "v1", 2) + ) + ); + } + + @Test + public void testHandoffChecksForPartitionNumber() + { + Interval interval = Intervals.of( + "2011-04-01/2011-04-02" + ); + Assert.assertTrue( + CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete( + Collections.singletonList( + new ImmutableSegmentLoadInfo( + createSegment(interval, "v1", 1), + Sets.newHashSet(createHistoricalServerMetadata("a")) + ) + ), + new SegmentDescriptor(interval, "v1", 1) + ) + ); + + Assert.assertFalse( + CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete( + Collections.singletonList( + new ImmutableSegmentLoadInfo( + createSegment(interval, "v1", 1), + Sets.newHashSet(createHistoricalServerMetadata("a")) + ) + ), + new SegmentDescriptor(interval, "v1", 2) + ) + ); + + } + + @Test + public void testHandoffChecksForInterval() + { + + Assert.assertFalse( + CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete( + Collections.singletonList( + new ImmutableSegmentLoadInfo( + createSegment(Intervals.of("2011-04-01/2011-04-02"), "v1", 1), + Sets.newHashSet(createHistoricalServerMetadata("a")) + ) + ), + new SegmentDescriptor(Intervals.of("2011-04-01/2011-04-03"), "v1", 1) + ) + ); + + Assert.assertTrue( + CoordinatorBasedSegmentHandoffNotifier.isHandOffComplete( + Collections.singletonList( + new ImmutableSegmentLoadInfo( + createSegment(Intervals.of("2011-04-01/2011-04-04"), "v1", 1), + Sets.newHashSet(createHistoricalServerMetadata("a")) + ) + ), + new SegmentDescriptor(Intervals.of("2011-04-02/2011-04-03"), "v1", 1) + ) + ); + } + + private DruidServerMetadata createRealtimeServerMetadata(String name) + { + return createServerMetadata(name, ServerType.REALTIME); + } + + private DruidServerMetadata createHistoricalServerMetadata(String name) + { + return createServerMetadata(name, ServerType.HISTORICAL); + } + + private DruidServerMetadata createServerMetadata(String name, ServerType type) + { + return new DruidServerMetadata( + name, + name, + null, + 10000, + type, + "tier", + 1 + ); + } + + private DataSegment createSegment(Interval interval, String version, int partitionNumber) + { + return new DataSegment( + "test_ds", + interval, + version, + null, + null, + null, + new NumberedShardSpec(partitionNumber, 100), + 0, 0 + ); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org