[bookkeeper] branch master updated: [TABLE SERVICE] Intercepted storage server channel should shutdown the underlying managed channel
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/bookkeeper.git The following commit(s) were added to refs/heads/master by this push: new 580d29e [TABLE SERVICE] Intercepted storage server channel should shutdown the underlying managed channel 580d29e is described below commit 580d29e47052fad058456c5028debe75701ea3a8 Author: Sijie Guo AuthorDate: Mon Oct 1 08:17:36 2018 -0700 [TABLE SERVICE] Intercepted storage server channel should shutdown the underlying managed channel Descriptions of the changes in this PR: *Motivation* When a storage server channel is intercepted, the underlying channel will not be `ManagedChannel` any more. so closing storage server channel will never close the underlying managed channel. *Changes* StorageServerChannel#intercept should keep reference to the original storage server channel. so when closing the intercepted channel, it can close the original channel. Author: Sijie Guo Reviewers: Jia Zhai This closes #1720 from sijie/close_intercepted_channel --- .../clients/impl/channel/StorageServerChannel.java | 24 +++--- .../impl/channel/TestStorageServerChannel.java | 12 +++ 2 files changed, 33 insertions(+), 3 deletions(-) diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java index 76c7631..e5206de 100644 --- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java +++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java @@ -58,6 +58,7 @@ public class StorageServerChannel implements AutoCloseable { private final Optional token; private final Channel channel; +private final StorageServerChannel interceptedServerChannel; @GuardedBy("this") private RootRangeServiceFutureStub rootRangeService; @@ -87,6 +88,11 @@ public class StorageServerChannel implements AutoCloseable { resolvedEndpoint.getPort()) .usePlaintext(usePlainText) .build(); +this.interceptedServerChannel = null; +} + +public Channel getGrpcChannel() { +return channel; } @VisibleForTesting @@ -97,8 +103,15 @@ public class StorageServerChannel implements AutoCloseable { protected StorageServerChannel(Channel channel, Optional token) { +this(channel, token, null); +} + +private StorageServerChannel(Channel channel, + Optional token, + StorageServerChannel interceptedServerChannel) { this.token = token; this.channel = channel; +this.interceptedServerChannel = interceptedServerChannel; } public synchronized RootRangeServiceFutureStub getRootRangeService() { @@ -153,13 +166,18 @@ public class StorageServerChannel implements AutoCloseable { interceptors); return new StorageServerChannel( interceptedChannel, -this.token); +this.token, +this); } @Override public void close() { -if (channel instanceof ManagedChannel) { -((ManagedChannel) channel).shutdown(); +if (interceptedServerChannel != null) { +interceptedServerChannel.close(); +} else { +if (channel instanceof ManagedChannel) { +((ManagedChannel) channel).shutdown(); +} } } } diff --git a/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/channel/TestStorageServerChannel.java b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/channel/TestStorageServerChannel.java index 1dd5f86..e126066 100644 --- a/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/channel/TestStorageServerChannel.java +++ b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/channel/TestStorageServerChannel.java @@ -19,6 +19,9 @@ package org.apache.bookkeeper.clients.impl.channel; import static org.junit.Assert.assertNotNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import io.grpc.ManagedChannel; import io.grpc.Server; @@ -68,4 +71,13 @@ public class TestStorageServerChannel { channel.close(); } +@Test +public void testIntercept() { +ManagedChannel channel = mock(ManagedChannel.class); +StorageServerChannel ssChannel = new StorageServerChannel(channel, Optional.empty()); +
[bookkeeper] branch master updated: [TABLE SERVICE] [STORAGE] add routing table for proxying table service requests
This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/bookkeeper.git The following commit(s) were added to refs/heads/master by this push: new 3446c2c [TABLE SERVICE] [STORAGE] add routing table for proxying table service requests 3446c2c is described below commit 3446c2cdb9c49ab5f547657a5d12e92022b56721 Author: Sijie Guo AuthorDate: Mon Oct 1 11:12:05 2018 -0700 [TABLE SERVICE] [STORAGE] add routing table for proxying table service requests Descriptions of the changes in this PR: *Motivation* In order to implement non-java clients and avoid the complexity in the client implementation. We need to proxy a routing table in the server side for proxying table service requests. *Changes* Add routing table in the service side to proxy grpc requests to the right storage containers. Author: Sijie Guo Reviewers: Jia Zhai , Enrico Olivelli This closes #1721 from sijie/add_routing_table --- .../impl/internal/ProtocolInternalUtils.java | 2 +- .../stream/protocol/ProtocolConstants.java | 22 +++ .../bookkeeper/stream/server/StorageServer.java| 18 +- .../storage/api/sc/StorageContainerRegistry.java | 9 + stream/storage/impl/pom.xml| 7 + .../storage/StorageContainerStoreBuilder.java | 17 ++ .../storage/impl/StorageContainerStoreImpl.java| 96 +-- .../storage/impl/routing/RangeRoutingTable.java| 38 .../impl/routing/RangeRoutingTableImpl.java| 100 +++ .../routing/RoutingHeaderProxyInterceptor.java}| 133 +++--- .../StorageContainerProxyChannelManager.java | 39 + .../StorageContainerProxyChannelManagerImpl.java | 60 +++ .../stream/storage/impl/routing/package-info.java | 23 +++ .../impl/sc/StorageContainerRegistryImpl.java | 7 +- .../storage/TestStorageContainerStoreBuilder.java | 19 ++ .../impl/TestStorageContainerStoreImpl.java| 3 + .../impl/routing/RangeRoutingTableImplTest.java| 191 + .../RoutingHeaderProxyInterceptorTest.java}| 19 +- ...torageContainerProxyChannelManagerImplTest.java | 111 19 files changed, 832 insertions(+), 82 deletions(-) diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/ProtocolInternalUtils.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/ProtocolInternalUtils.java index a89aaf7..abc9c6d 100644 --- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/ProtocolInternalUtils.java +++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/ProtocolInternalUtils.java @@ -49,7 +49,7 @@ public final class ProtocolInternalUtils { private ProtocolInternalUtils() { } -static HashStreamRanges createActiveRanges(GetActiveRangesResponse response) { +public static HashStreamRanges createActiveRanges(GetActiveRangesResponse response) { TreeMap ranges = Maps.newTreeMap(); long lastEndKey = Long.MIN_VALUE; for (RelatedRanges rr : response.getRangesList()) { diff --git a/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/ProtocolConstants.java b/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/ProtocolConstants.java index 0088c4b..b888b45 100644 --- a/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/ProtocolConstants.java +++ b/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/ProtocolConstants.java @@ -19,6 +19,8 @@ package org.apache.bookkeeper.stream.protocol; import io.grpc.Metadata; +import org.apache.bookkeeper.common.grpc.netty.IdentityBinaryMarshaller; +import org.apache.bookkeeper.common.grpc.netty.LongBinaryMarshaller; import org.apache.bookkeeper.stream.proto.FixedRangeSplitPolicy; import org.apache.bookkeeper.stream.proto.RangeKeyType; import org.apache.bookkeeper.stream.proto.RetentionPolicy; @@ -115,4 +117,24 @@ public final class ProtocolConstants { public static final String ROUTING_KEY = "rk" + Metadata.BINARY_HEADER_SUFFIX; public static final String STREAM_ID_KEY = "sid-" + Metadata.BINARY_HEADER_SUFFIX; public static final String RANGE_ID_KEY = "rid-" + Metadata.BINARY_HEADER_SUFFIX; + +// the metadata keys in grpc call metadata +public static final Metadata.Key SCID_METADATA_KEY = Metadata.Key.of( +SC_ID_KEY, +LongBinaryMarshaller.of() +); +public static final Metadata.Key RID_METADATA_KEY = Metadata.Key.of( +RANGE_ID_KEY, +LongBinaryMarshaller.of() +); +public static final Metadata.Key SID_METADATA_KEY = Metadata.Key.of( +STREAM_ID_KEY, +LongBinaryMarshaller.of() +); +public static final Metadata.Key