[bookkeeper] branch master updated: [TABLE SERVICE] Intercepted storage server channel should shutdown the underlying managed channel

2018-10-01 Thread sijie
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
 new 580d29e  [TABLE SERVICE] Intercepted storage server channel should 
shutdown the underlying managed channel
580d29e is described below

commit 580d29e47052fad058456c5028debe75701ea3a8
Author: Sijie Guo 
AuthorDate: Mon Oct 1 08:17:36 2018 -0700

[TABLE SERVICE] Intercepted storage server channel should shutdown the 
underlying managed channel

Descriptions of the changes in this PR:

*Motivation*

When a storage server channel is intercepted, the underlying channel will 
not be `ManagedChannel`
any more. so closing storage server channel will never close the underlying 
managed channel.

*Changes*

StorageServerChannel#intercept should keep reference to the original 
storage server channel.
so when closing the intercepted channel, it can close the original channel.




Author: Sijie Guo 

Reviewers: Jia Zhai 

This closes #1720 from sijie/close_intercepted_channel
---
 .../clients/impl/channel/StorageServerChannel.java | 24 +++---
 .../impl/channel/TestStorageServerChannel.java | 12 +++
 2 files changed, 33 insertions(+), 3 deletions(-)

diff --git 
a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
 
b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
index 76c7631..e5206de 100644
--- 
a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
+++ 
b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
@@ -58,6 +58,7 @@ public class StorageServerChannel implements AutoCloseable {
 
 private final Optional token;
 private final Channel channel;
+private final StorageServerChannel interceptedServerChannel;
 
 @GuardedBy("this")
 private RootRangeServiceFutureStub rootRangeService;
@@ -87,6 +88,11 @@ public class StorageServerChannel implements AutoCloseable {
 resolvedEndpoint.getPort())
 .usePlaintext(usePlainText)
 .build();
+this.interceptedServerChannel = null;
+}
+
+public Channel getGrpcChannel() {
+return channel;
 }
 
 @VisibleForTesting
@@ -97,8 +103,15 @@ public class StorageServerChannel implements AutoCloseable {
 
 protected StorageServerChannel(Channel channel,
Optional token) {
+this(channel, token, null);
+}
+
+private StorageServerChannel(Channel channel,
+ Optional token,
+ StorageServerChannel 
interceptedServerChannel) {
 this.token = token;
 this.channel = channel;
+this.interceptedServerChannel = interceptedServerChannel;
 }
 
 public synchronized RootRangeServiceFutureStub getRootRangeService() {
@@ -153,13 +166,18 @@ public class StorageServerChannel implements 
AutoCloseable {
 interceptors);
 return new StorageServerChannel(
 interceptedChannel,
-this.token);
+this.token,
+this);
 }
 
 @Override
 public void close() {
-if (channel instanceof ManagedChannel) {
-((ManagedChannel) channel).shutdown();
+if (interceptedServerChannel != null) {
+interceptedServerChannel.close();
+} else {
+if (channel instanceof ManagedChannel) {
+((ManagedChannel) channel).shutdown();
+}
 }
 }
 }
diff --git 
a/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/channel/TestStorageServerChannel.java
 
b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/channel/TestStorageServerChannel.java
index 1dd5f86..e126066 100644
--- 
a/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/channel/TestStorageServerChannel.java
+++ 
b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/channel/TestStorageServerChannel.java
@@ -19,6 +19,9 @@
 package org.apache.bookkeeper.clients.impl.channel;
 
 import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 import io.grpc.ManagedChannel;
 import io.grpc.Server;
@@ -68,4 +71,13 @@ public class TestStorageServerChannel {
 channel.close();
 }
 
+@Test
+public void testIntercept() {
+ManagedChannel channel = mock(ManagedChannel.class);
+StorageServerChannel ssChannel = new StorageServerChannel(channel, 
Optional.empty());
+

[bookkeeper] branch master updated: [TABLE SERVICE] [STORAGE] add routing table for proxying table service requests

2018-10-01 Thread sijie
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
 new 3446c2c  [TABLE SERVICE] [STORAGE] add routing table for proxying 
table service requests
3446c2c is described below

commit 3446c2cdb9c49ab5f547657a5d12e92022b56721
Author: Sijie Guo 
AuthorDate: Mon Oct 1 11:12:05 2018 -0700

[TABLE SERVICE] [STORAGE] add routing table for proxying table service 
requests

Descriptions of the changes in this PR:

*Motivation*

In order to implement non-java clients and avoid the complexity in the 
client implementation.
We need to proxy a routing table in the server side for proxying table 
service requests.

*Changes*

Add routing table in the service side to proxy grpc requests to the right 
storage containers.




Author: Sijie Guo 

Reviewers: Jia Zhai , Enrico Olivelli 

This closes #1721 from sijie/add_routing_table
---
 .../impl/internal/ProtocolInternalUtils.java   |   2 +-
 .../stream/protocol/ProtocolConstants.java |  22 +++
 .../bookkeeper/stream/server/StorageServer.java|  18 +-
 .../storage/api/sc/StorageContainerRegistry.java   |   9 +
 stream/storage/impl/pom.xml|   7 +
 .../storage/StorageContainerStoreBuilder.java  |  17 ++
 .../storage/impl/StorageContainerStoreImpl.java|  96 +--
 .../storage/impl/routing/RangeRoutingTable.java|  38 
 .../impl/routing/RangeRoutingTableImpl.java| 100 +++
 .../routing/RoutingHeaderProxyInterceptor.java}| 133 +++---
 .../StorageContainerProxyChannelManager.java   |  39 +
 .../StorageContainerProxyChannelManagerImpl.java   |  60 +++
 .../stream/storage/impl/routing/package-info.java  |  23 +++
 .../impl/sc/StorageContainerRegistryImpl.java  |   7 +-
 .../storage/TestStorageContainerStoreBuilder.java  |  19 ++
 .../impl/TestStorageContainerStoreImpl.java|   3 +
 .../impl/routing/RangeRoutingTableImplTest.java| 191 +
 .../RoutingHeaderProxyInterceptorTest.java}|  19 +-
 ...torageContainerProxyChannelManagerImplTest.java | 111 
 19 files changed, 832 insertions(+), 82 deletions(-)

diff --git 
a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/ProtocolInternalUtils.java
 
b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/ProtocolInternalUtils.java
index a89aaf7..abc9c6d 100644
--- 
a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/ProtocolInternalUtils.java
+++ 
b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/ProtocolInternalUtils.java
@@ -49,7 +49,7 @@ public final class ProtocolInternalUtils {
 private ProtocolInternalUtils() {
 }
 
-static HashStreamRanges createActiveRanges(GetActiveRangesResponse 
response) {
+public static HashStreamRanges createActiveRanges(GetActiveRangesResponse 
response) {
 TreeMap ranges = Maps.newTreeMap();
 long lastEndKey = Long.MIN_VALUE;
 for (RelatedRanges rr : response.getRangesList()) {
diff --git 
a/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/ProtocolConstants.java
 
b/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/ProtocolConstants.java
index 0088c4b..b888b45 100644
--- 
a/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/ProtocolConstants.java
+++ 
b/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/ProtocolConstants.java
@@ -19,6 +19,8 @@
 package org.apache.bookkeeper.stream.protocol;
 
 import io.grpc.Metadata;
+import org.apache.bookkeeper.common.grpc.netty.IdentityBinaryMarshaller;
+import org.apache.bookkeeper.common.grpc.netty.LongBinaryMarshaller;
 import org.apache.bookkeeper.stream.proto.FixedRangeSplitPolicy;
 import org.apache.bookkeeper.stream.proto.RangeKeyType;
 import org.apache.bookkeeper.stream.proto.RetentionPolicy;
@@ -115,4 +117,24 @@ public final class ProtocolConstants {
 public static final String ROUTING_KEY = "rk" + 
Metadata.BINARY_HEADER_SUFFIX;
 public static final String STREAM_ID_KEY = "sid-" + 
Metadata.BINARY_HEADER_SUFFIX;
 public static final String RANGE_ID_KEY = "rid-" + 
Metadata.BINARY_HEADER_SUFFIX;
+
+// the metadata keys in grpc call metadata
+public static final Metadata.Key SCID_METADATA_KEY = Metadata.Key.of(
+SC_ID_KEY,
+LongBinaryMarshaller.of()
+);
+public static final Metadata.Key RID_METADATA_KEY = Metadata.Key.of(
+RANGE_ID_KEY,
+LongBinaryMarshaller.of()
+);
+public static final Metadata.Key SID_METADATA_KEY = Metadata.Key.of(
+STREAM_ID_KEY,
+LongBinaryMarshaller.of()
+);
+public static final Metadata.Key