This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 3446c2c  [TABLE SERVICE] [STORAGE] add routing table for proxying 
table service requests
3446c2c is described below

commit 3446c2cdb9c49ab5f547657a5d12e92022b56721
Author: Sijie Guo <guosi...@gmail.com>
AuthorDate: Mon Oct 1 11:12:05 2018 -0700

    [TABLE SERVICE] [STORAGE] add routing table for proxying table service 
requests
    
    Descriptions of the changes in this PR:
    
    *Motivation*
    
    In order to implement non-java clients and avoid the complexity in the 
client implementation.
    We need to proxy a routing table in the server side for proxying table 
service requests.
    
    *Changes*
    
    Add routing table in the service side to proxy grpc requests to the right 
storage containers.
    
    
    
    
    Author: Sijie Guo <si...@apache.org>
    
    Reviewers: Jia Zhai <None>, Enrico Olivelli <eolive...@gmail.com>
    
    This closes #1721 from sijie/add_routing_table
---
 .../impl/internal/ProtocolInternalUtils.java       |   2 +-
 .../stream/protocol/ProtocolConstants.java         |  22 +++
 .../bookkeeper/stream/server/StorageServer.java    |  18 +-
 .../storage/api/sc/StorageContainerRegistry.java   |   9 +
 stream/storage/impl/pom.xml                        |   7 +
 .../storage/StorageContainerStoreBuilder.java      |  17 ++
 .../storage/impl/StorageContainerStoreImpl.java    |  96 +++++++++--
 .../storage/impl/routing/RangeRoutingTable.java    |  38 ++++
 .../impl/routing/RangeRoutingTableImpl.java        | 100 +++++++++++
 .../routing/RoutingHeaderProxyInterceptor.java}    | 133 +++++++-------
 .../StorageContainerProxyChannelManager.java       |  39 +++++
 .../StorageContainerProxyChannelManagerImpl.java   |  60 +++++++
 .../stream/storage/impl/routing/package-info.java  |  23 +++
 .../impl/sc/StorageContainerRegistryImpl.java      |   7 +-
 .../storage/TestStorageContainerStoreBuilder.java  |  19 ++
 .../impl/TestStorageContainerStoreImpl.java        |   3 +
 .../impl/routing/RangeRoutingTableImplTest.java    | 191 +++++++++++++++++++++
 .../RoutingHeaderProxyInterceptorTest.java}        |  19 +-
 ...torageContainerProxyChannelManagerImplTest.java | 111 ++++++++++++
 19 files changed, 832 insertions(+), 82 deletions(-)

diff --git 
a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/ProtocolInternalUtils.java
 
b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/ProtocolInternalUtils.java
index a89aaf7..abc9c6d 100644
--- 
a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/ProtocolInternalUtils.java
+++ 
b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/ProtocolInternalUtils.java
@@ -49,7 +49,7 @@ public final class ProtocolInternalUtils {
     private ProtocolInternalUtils() {
     }
 
-    static HashStreamRanges createActiveRanges(GetActiveRangesResponse 
response) {
+    public static HashStreamRanges createActiveRanges(GetActiveRangesResponse 
response) {
         TreeMap<Long, RangeProperties> ranges = Maps.newTreeMap();
         long lastEndKey = Long.MIN_VALUE;
         for (RelatedRanges rr : response.getRangesList()) {
diff --git 
a/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/ProtocolConstants.java
 
b/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/ProtocolConstants.java
index 0088c4b..b888b45 100644
--- 
a/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/ProtocolConstants.java
+++ 
b/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/ProtocolConstants.java
@@ -19,6 +19,8 @@
 package org.apache.bookkeeper.stream.protocol;
 
 import io.grpc.Metadata;
+import org.apache.bookkeeper.common.grpc.netty.IdentityBinaryMarshaller;
+import org.apache.bookkeeper.common.grpc.netty.LongBinaryMarshaller;
 import org.apache.bookkeeper.stream.proto.FixedRangeSplitPolicy;
 import org.apache.bookkeeper.stream.proto.RangeKeyType;
 import org.apache.bookkeeper.stream.proto.RetentionPolicy;
@@ -115,4 +117,24 @@ public final class ProtocolConstants {
     public static final String ROUTING_KEY = "rk" + 
Metadata.BINARY_HEADER_SUFFIX;
     public static final String STREAM_ID_KEY = "sid-" + 
Metadata.BINARY_HEADER_SUFFIX;
     public static final String RANGE_ID_KEY = "rid-" + 
Metadata.BINARY_HEADER_SUFFIX;
+
+    // the metadata keys in grpc call metadata
+    public static final Metadata.Key<Long> SCID_METADATA_KEY = Metadata.Key.of(
+        SC_ID_KEY,
+        LongBinaryMarshaller.of()
+    );
+    public static final Metadata.Key<Long> RID_METADATA_KEY = Metadata.Key.of(
+        RANGE_ID_KEY,
+        LongBinaryMarshaller.of()
+    );
+    public static final Metadata.Key<Long> SID_METADATA_KEY = Metadata.Key.of(
+        STREAM_ID_KEY,
+        LongBinaryMarshaller.of()
+    );
+    public static final Metadata.Key<byte[]> RK_METADATA_KEY = Metadata.Key.of(
+        ROUTING_KEY,
+        IdentityBinaryMarshaller.of()
+    );
+
+
 }
diff --git 
a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
 
b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
index 97ff66a..4385d56 100644
--- 
a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
+++ 
b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
@@ -25,6 +25,9 @@ import java.net.UnknownHostException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.clients.config.StorageClientSettings;
+import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
+import 
org.apache.bookkeeper.clients.impl.internal.StorageServerClientManagerImpl;
 import org.apache.bookkeeper.common.component.ComponentStarter;
 import org.apache.bookkeeper.common.component.LifecycleComponent;
 import org.apache.bookkeeper.common.component.LifecycleComponentStack;
@@ -54,6 +57,7 @@ import 
org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
 import org.apache.bookkeeper.stream.storage.impl.cluster.ClusterControllerImpl;
 import 
org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterControllerLeaderSelector;
 import 
org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterMetadataStore;
+import 
org.apache.bookkeeper.stream.storage.impl.routing.RoutingHeaderProxyInterceptor;
 import 
org.apache.bookkeeper.stream.storage.impl.sc.DefaultStorageContainerController;
 import 
org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerPlacementPolicyImpl;
 import org.apache.bookkeeper.stream.storage.impl.sc.ZkStorageContainerManager;
@@ -249,6 +253,10 @@ public class StorageServer {
             dlConf,
             rootStatsLogger.scope("dlog"));
 
+        // client settings for the proxy channels
+        StorageClientSettings proxyClientSettings = 
StorageClientSettings.newBuilder()
+            .serviceUri("bk://localhost:" + grpcPort)
+            .build();
         // Create range (stream) store
         StorageContainerStoreBuilder storageContainerStoreBuilder = 
StorageContainerStoreBuilder.newBuilder()
             .withStatsLogger(rootStatsLogger.scope("storage"))
@@ -286,7 +294,15 @@ public class StorageServer {
                     () -> new DLCheckpointStore(dlNamespaceProvider.get()),
                     storageConf.getRangeStoreDirs(),
                     storageResources,
-                    storageConf.getServeReadOnlyTables()));
+                    storageConf.getServeReadOnlyTables()))
+            // with client manager for proxying grpc requests
+            .withStorageServerClientManager(() -> new 
StorageServerClientManagerImpl(
+                proxyClientSettings,
+                storageResources.scheduler(),
+                StorageServerChannel.factory(proxyClientSettings)
+                    // intercept the channel to attach routing header
+                    .andThen(channel -> channel.intercept(new 
RoutingHeaderProxyInterceptor()))
+            ));
         StorageService storageService = new StorageService(
             storageConf, storageContainerStoreBuilder, 
rootStatsLogger.scope("storage"));
 
diff --git 
a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerRegistry.java
 
b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerRegistry.java
index 57634d3..a051060 100644
--- 
a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerRegistry.java
+++ 
b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerRegistry.java
@@ -36,6 +36,15 @@ public interface StorageContainerRegistry extends 
AutoCloseable {
     StorageContainer getStorageContainer(long storageContainerId);
 
     /**
+     * Get the instance of storage container {@code storageContainerId}.
+     *
+     * @param storageContainerId storage container id
+     * @param defaultContainer the default container to return if the 
container doesn't exist in the registry
+     * @return the instance of the storage container.
+     */
+    StorageContainer getStorageContainer(long storageContainerId, 
StorageContainer defaultContainer);
+
+    /**
      * Start the storage container in this registry.
      *
      * @param scId storage container id
diff --git a/stream/storage/impl/pom.xml b/stream/storage/impl/pom.xml
index 52cef7f..a2ce3e0 100644
--- a/stream/storage/impl/pom.xml
+++ b/stream/storage/impl/pom.xml
@@ -66,6 +66,13 @@
       <artifactId>stream-storage-tests-common</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.bookkeeper</groupId>
+      <artifactId>stream-storage-java-client-base</artifactId>
+      <version>${project.parent.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
   <build>
     <plugins>
diff --git 
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/StorageContainerStoreBuilder.java
 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/StorageContainerStoreBuilder.java
index 9c96139..cac363f 100644
--- 
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/StorageContainerStoreBuilder.java
+++ 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/StorageContainerStoreBuilder.java
@@ -17,6 +17,8 @@ package org.apache.bookkeeper.stream.storage;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import java.net.URI;
+import java.util.function.Supplier;
+import 
org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import 
org.apache.bookkeeper.stream.protocol.util.StorageContainerPlacementPolicy;
@@ -46,6 +48,7 @@ public final class StorageContainerStoreBuilder {
         StorageContainerPlacementPolicyImpl.of(1024);
     private MVCCStoreFactory mvccStoreFactory = null;
     private URI defaultBackendUri = null;
+    private Supplier<StorageServerClientManager> clientManagerSupplier;
 
     private StorageContainerStoreBuilder() {
     }
@@ -131,12 +134,25 @@ public final class StorageContainerStoreBuilder {
         return this;
     }
 
+    /**
+     * Supplier to provide client manager for proxying requests.
+     *
+     * @param clientManagerSupplier client manager supplier
+     * @return storage container store builder
+     */
+    public StorageContainerStoreBuilder withStorageServerClientManager(
+        Supplier<StorageServerClientManager> clientManagerSupplier) {
+        this.clientManagerSupplier = clientManagerSupplier;
+        return this;
+    }
+
     public StorageContainerStore build() {
         checkNotNull(scmFactory, "StorageContainerManagerFactory is not 
provided");
         checkNotNull(storeConf, "StorageConfiguration is not provided");
         checkNotNull(mvccStoreFactory, "MVCCStoreFactory is not provided");
         checkNotNull(defaultBackendUri, "Default backend uri is not provided");
         checkNotNull(placementPolicyFactory, "Storage Container Placement 
Policy Factory is not provided");
+        checkNotNull(clientManagerSupplier, "Storage server client manager is 
not provided");
 
         RangeStoreServiceFactoryImpl serviceFactory = new 
RangeStoreServiceFactoryImpl(
             storeConf,
@@ -152,6 +168,7 @@ public final class StorageContainerStoreBuilder {
             storeConf,
             scmFactory,
             containerServiceFactory,
+            clientManagerSupplier.get(),
             statsLogger);
     }
 
diff --git 
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/StorageContainerStoreImpl.java
 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/StorageContainerStoreImpl.java
index 56fcba2..1271e85 100644
--- 
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/StorageContainerStoreImpl.java
+++ 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/StorageContainerStoreImpl.java
@@ -15,15 +15,19 @@
 package org.apache.bookkeeper.stream.storage.impl;
 
 import static 
org.apache.bookkeeper.stream.protocol.ProtocolConstants.INVALID_STORAGE_CONTAINER_ID;
-import static 
org.apache.bookkeeper.stream.protocol.ProtocolConstants.SC_ID_KEY;
+import static 
org.apache.bookkeeper.stream.protocol.ProtocolConstants.RID_METADATA_KEY;
+import static 
org.apache.bookkeeper.stream.protocol.ProtocolConstants.RK_METADATA_KEY;
+import static 
org.apache.bookkeeper.stream.protocol.ProtocolConstants.SCID_METADATA_KEY;
+import static 
org.apache.bookkeeper.stream.protocol.ProtocolConstants.SID_METADATA_KEY;
 
 import io.grpc.Channel;
 import io.grpc.Metadata;
 import io.grpc.ServerCall;
 import java.io.IOException;
+import 
org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager;
 import org.apache.bookkeeper.common.component.AbstractLifecycleComponent;
-import org.apache.bookkeeper.common.grpc.netty.LongBinaryMarshaller;
 import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stream.proto.RangeProperties;
 import org.apache.bookkeeper.stream.storage.api.StorageContainerStore;
 import org.apache.bookkeeper.stream.storage.api.sc.StorageContainer;
 import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerManager;
@@ -31,11 +35,15 @@ import 
org.apache.bookkeeper.stream.storage.api.sc.StorageContainerManagerFactor
 import 
org.apache.bookkeeper.stream.storage.api.sc.StorageContainerRoutingService;
 import 
org.apache.bookkeeper.stream.storage.api.sc.StorageContainerServiceFactory;
 import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
+import org.apache.bookkeeper.stream.storage.impl.routing.RangeRoutingTable;
+import org.apache.bookkeeper.stream.storage.impl.routing.RangeRoutingTableImpl;
+import 
org.apache.bookkeeper.stream.storage.impl.routing.StorageContainerProxyChannelManager;
+import 
org.apache.bookkeeper.stream.storage.impl.routing.StorageContainerProxyChannelManagerImpl;
 import 
org.apache.bookkeeper.stream.storage.impl.sc.DefaultStorageContainerFactory;
 import 
org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerRegistryImpl;
 
 /**
- * KeyRange Service.
+ * Storage Container Store manages the containers and routes the requests 
accordingly.
  */
 public class StorageContainerStoreImpl
     extends AbstractLifecycleComponent<StorageConfiguration>
@@ -45,11 +53,14 @@ public class StorageContainerStoreImpl
     private final StorageContainerRegistryImpl scRegistry;
     private final StorageContainerManager scManager;
     private final StorageContainerServiceFactory serviceFactory;
-    private final Metadata.Key<Long> scIdKey;
+    private final StorageServerClientManager ssClientManager;
+    private final RangeRoutingTable routingTable;
+    private final StorageContainerProxyChannelManager proxyChannelManager;
 
     public StorageContainerStoreImpl(StorageConfiguration conf,
                                      StorageContainerManagerFactory 
managerFactory,
                                      StorageContainerServiceFactory 
serviceFactory,
+                                     StorageServerClientManager 
ssClientManager,
                                      StatsLogger statsLogger) {
         super("range-service", conf, statsLogger);
         this.scmFactory = managerFactory;
@@ -57,9 +68,14 @@ public class StorageContainerStoreImpl
             new DefaultStorageContainerFactory(serviceFactory));
         this.scManager = scmFactory.create(conf, scRegistry);
         this.serviceFactory = serviceFactory;
-        this.scIdKey = Metadata.Key.of(
-            SC_ID_KEY,
-            LongBinaryMarshaller.of());
+        this.ssClientManager = ssClientManager;
+        if (ssClientManager == null) {
+            this.proxyChannelManager = null;
+            this.routingTable = null;
+        } else {
+            this.proxyChannelManager = new 
StorageContainerProxyChannelManagerImpl(ssClientManager);
+            this.routingTable = new RangeRoutingTableImpl(ssClientManager);
+        }
     }
 
     @Override
@@ -83,6 +99,10 @@ public class StorageContainerStoreImpl
 
     @Override
     protected void doStop() {
+        // it doesn't have to be blocked at waiting closing proxy channels to 
be completed.
+        if (null != ssClientManager) {
+            this.ssClientManager.closeAsync();
+        }
         this.scManager.stop();
         this.scRegistry.close();
     }
@@ -97,17 +117,69 @@ public class StorageContainerStoreImpl
         return scRegistry.getStorageContainer(scId);
     }
 
+    StorageContainer getStorageContainer(long scId, StorageContainer 
defaultContainer) {
+        return scRegistry.getStorageContainer(scId, defaultContainer);
+    }
+
     //
     // Utils for proxies
     //
 
     @Override
     public Channel findChannel(ServerCall<?, ?> serverCall, Metadata headers) {
-        Long scId = headers.get(scIdKey);
-        if (null == scId) {
-            // use the invalid storage container id, so it will fail the 
request.
-            scId = INVALID_STORAGE_CONTAINER_ID;
+        Long scId = headers.get(SCID_METADATA_KEY);
+        if (null != scId) {
+            // if a request is sent directly to a container, then find the 
container
+            StorageContainer container = getStorageContainer(scId, null);
+            if (null != container) {
+                return container.getChannel();
+            } else {
+                if (scId == 0L && null != proxyChannelManager) {
+                    // root container, we attempt to always proxy the requests 
for root container
+                    Channel channel = 
proxyChannelManager.getStorageContainerChannel(0L);
+                    if (null != channel) {
+                        return channel;
+                    } else {
+                        // no channel found to proxy the request, fail the 
request with 404 container
+                        return 
getStorageContainer(INVALID_STORAGE_CONTAINER_ID).getChannel();
+                    }
+                } else {
+                    // no container is found and the scId is not the root 
container
+                    // then fail the request with 404 container
+                    return 
getStorageContainer(INVALID_STORAGE_CONTAINER_ID).getChannel();
+                }
+            }
+        } else {
+            // if a request is not sent directly to a container, then check if
+            // streamId + routingKey is attached in the header. if so, figure 
out
+            // which the range id and storage container id to route the 
request.
+            byte[] routingKey = headers.get(RK_METADATA_KEY);
+            Long streamId = headers.get(SID_METADATA_KEY);
+            if (null != routingKey && null != streamId && null != routingTable 
&& null != proxyChannelManager) {
+                RangeProperties rangeProps = routingTable.getRange(streamId, 
routingKey);
+                if (null != rangeProps) {
+                    long containerId = rangeProps.getStorageContainerId();
+                    long rangeId = rangeProps.getRangeId();
+                    // if we find the routing information, we can update the 
headers
+                    headers.put(SCID_METADATA_KEY, containerId);
+                    headers.put(RID_METADATA_KEY, rangeId);
+                    // if we find the container id, we can check whether the 
container is owned locally.
+                    // if so, forward the request to the container.
+                    StorageContainer container = 
getStorageContainer(containerId, null);
+                    if (null == container) {
+                        // the container doesn't belong to here, then find the 
channel to forward the request
+                        Channel channel = 
proxyChannelManager.getStorageContainerChannel(containerId);
+                        if (null != channel) {
+                            return channel;
+                        }
+                    } else {
+                        // we found the container exists in the registry to 
serve the request
+                        return container.getChannel();
+                    }
+                }
+            }
+            // there is no storage container id or routing key, then fail the 
request and ask client to retry.
+            return 
getStorageContainer(INVALID_STORAGE_CONTAINER_ID).getChannel();
         }
-        return getStorageContainer(scId).getChannel();
     }
 }
diff --git 
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTable.java
 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTable.java
new file mode 100644
index 0000000..bb80142
--- /dev/null
+++ 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTable.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.stream.storage.impl.routing;
+
+import org.apache.bookkeeper.stream.proto.RangeProperties;
+
+/**
+ * A routing table for figuring the ranges to forward the requests.
+ */
+public interface RangeRoutingTable {
+
+    /**
+     * Get the range metadata for <tt>streamId</tt> and the routing key 
<tt>routingKey</tt>.
+     *
+     * @param streamId stream id
+     * @param routingKey routing key
+     * @return the range that serves this routing key
+     */
+    RangeProperties getRange(long streamId, byte[] routingKey);
+
+}
diff --git 
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTableImpl.java
 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTableImpl.java
new file mode 100644
index 0000000..b9727c9
--- /dev/null
+++ 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTableImpl.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.stream.storage.impl.routing;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
+import 
org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager;
+import org.apache.bookkeeper.clients.impl.routing.RangeRouter;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.common.router.BytesHashRouter;
+import org.apache.bookkeeper.stream.proto.RangeProperties;
+import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap;
+
+/**
+ * A default implementation of {@link RangeRoutingTable}.
+ */
+@Slf4j
+public class RangeRoutingTableImpl implements RangeRoutingTable {
+
+    private final StorageServerClientManager manager;
+    private final ConcurrentLongHashMap<RangeRouter<byte[]>> ranges;
+
+    // outstanding requests
+    private final 
ConcurrentLongHashMap<CompletableFuture<RangeRouter<byte[]>>> 
outstandingUpdates;
+
+    public RangeRoutingTableImpl(StorageServerClientManager manager) {
+        this.manager = manager;
+        this.ranges = new ConcurrentLongHashMap<>();
+        this.outstandingUpdates = new ConcurrentLongHashMap<>();
+    }
+
+    @VisibleForTesting
+    RangeRouter<byte[]> getRangeRouter(long streamId) {
+        return ranges.get(streamId);
+    }
+
+    @Override
+    public RangeProperties getRange(long streamId, byte[] routingKey) {
+        RangeRouter<byte[]> router = ranges.get(streamId);
+        if (null == router) {
+            // trigger to fetch stream metadata, but return `null` since
+            // the range router is not ready, let the client backoff and retry.
+            fetchStreamRanges(streamId);
+            return null;
+        } else {
+            return router.getRangeProperties(routingKey);
+        }
+    }
+
+    @VisibleForTesting
+    CompletableFuture<RangeRouter<byte[]>> getOutstandingFetchRequest(long 
streamId) {
+        return outstandingUpdates.get(streamId);
+    }
+
+    private void fetchStreamRanges(long streamId) {
+        if (null != outstandingUpdates.get(streamId)) {
+            // there is already an outstanding fetch request, do nothing
+            return;
+        }
+        final CompletableFuture<RangeRouter<byte[]>> newFetchFuture = new 
CompletableFuture<>();
+        if (null != outstandingUpdates.put(streamId, newFetchFuture)) {
+            // some one already triggers the fetch
+            return;
+        }
+        FutureUtils.proxyTo(
+            manager.openMetaRangeClient(streamId)
+                .thenCompose(metaRangeClient -> 
metaRangeClient.getActiveDataRanges())
+                .thenApply(hashStreamRanges -> {
+                    RangeRouter<byte[]> router = new 
RangeRouter<>(BytesHashRouter.of());
+                    router.setRanges(hashStreamRanges);
+                    return router;
+                })
+                .whenComplete((router, cause) -> {
+                    if (null == cause) {
+                        ranges.put(streamId, router);
+                    }
+                    outstandingUpdates.remove(streamId, newFetchFuture);
+                }),
+            newFetchFuture
+        );
+    }
+}
diff --git 
a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/interceptors/RoutingHeaderClientInterceptor.java
 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RoutingHeaderProxyInterceptor.java
similarity index 70%
rename from 
stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/interceptors/RoutingHeaderClientInterceptor.java
rename to 
stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RoutingHeaderProxyInterceptor.java
index 2ef970f..377f6df 100644
--- 
a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/interceptors/RoutingHeaderClientInterceptor.java
+++ 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RoutingHeaderProxyInterceptor.java
@@ -17,11 +17,11 @@
  * under the License.
  */
 
-package org.apache.bookkeeper.clients.impl.kv.interceptors;
+package org.apache.bookkeeper.stream.storage.impl.routing;
 
-import static 
org.apache.bookkeeper.stream.protocol.ProtocolConstants.RANGE_ID_KEY;
-import static 
org.apache.bookkeeper.stream.protocol.ProtocolConstants.ROUTING_KEY;
-import static 
org.apache.bookkeeper.stream.protocol.ProtocolConstants.STREAM_ID_KEY;
+import static 
org.apache.bookkeeper.stream.protocol.ProtocolConstants.RID_METADATA_KEY;
+import static 
org.apache.bookkeeper.stream.protocol.ProtocolConstants.RK_METADATA_KEY;
+import static 
org.apache.bookkeeper.stream.protocol.ProtocolConstants.SID_METADATA_KEY;
 
 import com.google.protobuf.CodedOutputStream;
 import com.google.protobuf.MessageLite;
@@ -34,6 +34,9 @@ import io.grpc.ClientInterceptor;
 import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
 import io.grpc.Metadata;
 import io.grpc.MethodDescriptor;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.buffer.PooledByteBufAllocator;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -41,8 +44,6 @@ import java.util.HashMap;
 import java.util.Map;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.common.grpc.netty.IdentityBinaryMarshaller;
-import org.apache.bookkeeper.common.grpc.netty.LongBinaryMarshaller;
 import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeRequest;
 import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementRequest;
 import org.apache.bookkeeper.stream.proto.kv.rpc.PutRequest;
@@ -50,25 +51,13 @@ import 
org.apache.bookkeeper.stream.proto.kv.rpc.RangeRequest;
 import org.apache.bookkeeper.stream.proto.kv.rpc.RoutingHeader;
 import org.apache.bookkeeper.stream.proto.kv.rpc.TableServiceGrpc;
 import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest;
+import org.apache.commons.codec.binary.Hex;
 
 /**
  * A client interceptor that intercepting kv rpcs to attach routing 
information.
  */
 @Slf4j
-public class RoutingHeaderClientInterceptor implements ClientInterceptor {
-
-    static final Metadata.Key<Long> RID_METADATA_KEY = Metadata.Key.of(
-        RANGE_ID_KEY,
-        LongBinaryMarshaller.of()
-    );
-    static final Metadata.Key<Long> SID_METADATA_KEY = Metadata.Key.of(
-        STREAM_ID_KEY,
-        LongBinaryMarshaller.of()
-    );
-    static final Metadata.Key<byte[]> RK_METADATA_KEY = Metadata.Key.of(
-        ROUTING_KEY,
-        IdentityBinaryMarshaller.of()
-    );
+public class RoutingHeaderProxyInterceptor implements ClientInterceptor {
 
     /**
      * Table request mutator that mutates a table service rpc request to attach
@@ -175,53 +164,75 @@ public class RoutingHeaderClientInterceptor implements 
ClientInterceptor {
                                                                CallOptions 
callOptions,
                                                                Channel next) {
         if (log.isTraceEnabled()) {
-            log.trace("Intercepting method {}", method.getFullMethodName());
+            log.trace("Intercepting method {} : req marshaller = {}, resp 
marshaller = {}",
+                method.getFullMethodName(),
+                method.getRequestMarshaller(),
+                method.getResponseMarshaller());
         }
         InterceptorDescriptor<?> descriptor = 
kvRpcMethods.get(method.getFullMethodName());
-        if (null != descriptor) {
-            return new SimpleForwardingClientCall<ReqT, 
RespT>(next.newCall(method, callOptions)) {
-
-                private Long rid = null;
-                private Long sid = null;
-                private byte[] rk = null;
+        return new SimpleForwardingClientCall<ReqT, 
RespT>(next.newCall(method, callOptions)) {
 
-                @Override
-                public void start(Listener<RespT> responseListener, Metadata 
headers) {
-                    // capture routing information from headers
-                    sid = headers.get(SID_METADATA_KEY);
-                    rid = headers.get(RID_METADATA_KEY);
-                    rk  = headers.get(RK_METADATA_KEY);
-                    if (log.isTraceEnabled()) {
-                        log.trace("Intercepting request with header : sid = 
{}, rid = {}, rk = {}",
-                            sid, rid, rk);
-                    }
+            private Long rid = null;
+            private Long sid = null;
+            private byte[] rk = null;
 
-                    delegate().start(responseListener, headers);
+            @Override
+            public void start(Listener<RespT> responseListener, Metadata 
headers) {
+                // capture routing information from headers
+                sid = headers.get(SID_METADATA_KEY);
+                rid = headers.get(RID_METADATA_KEY);
+                rk  = headers.get(RK_METADATA_KEY);
+                if (log.isTraceEnabled()) {
+                    log.trace("Intercepting request with header : sid = {}, 
rid = {}, rk = {}",
+                        sid, rid, rk);
                 }
 
-                @Override
-                public void sendMessage(ReqT message) {
-                    ReqT interceptedMessage;
-                    if (null == rid || null == sid || null == rk) {
-                        // we don't have enough information to form the new 
routing header
-                        // so do nothing
-                        interceptedMessage = message;
-                    } else {
-                        interceptedMessage = interceptMessage(
-                            method,
-                            descriptor,
-                            message,
-                            sid,
-                            rid,
-                            rk
-                        );
-                    }
-                    delegate().sendMessage(interceptedMessage);
+                delegate().start(responseListener, headers);
+            }
+
+            @Override
+            public void sendMessage(ReqT message) {
+                ReqT interceptedMessage;
+                if (null == rid || null == sid || null == rk || null == 
descriptor) {
+                    // we don't have enough information to form the new 
routing header
+                    // we simply copy the bytes and generate a new message to 
forward
+                    // the request payload
+                    interceptedMessage = interceptMessage(method, message);
+                } else {
+                    interceptedMessage = interceptMessage(
+                        method,
+                        descriptor,
+                        message,
+                        sid,
+                        rid,
+                        rk
+                    );
                 }
-            };
-        } else {
-            return next.newCall(method, callOptions);
+                delegate().sendMessage(interceptedMessage);
+            }
+        };
+    }
+
+    private <ReqT, RespT> ReqT interceptMessage(MethodDescriptor<ReqT, RespT> 
method, ReqT message) {
+        InputStream is = method.getRequestMarshaller().stream(message);
+        int bytes;
+        try {
+            bytes = is.available();
+        } catch (IOException e) {
+            log.warn("Encountered exceptions in getting available bytes of 
message", e);
+            throw new RuntimeException("Encountered exception in intercepting 
message", e);
+        }
+        ByteBuf buffer = PooledByteBufAllocator.DEFAULT.buffer();
+        try {
+            buffer.writeBytes(is, bytes);
+        } catch (IOException e) {
+            log.warn("Encountered exceptions in transferring bytes to the 
buffer", e);
+            buffer.release();
+            throw new RuntimeException("Encountered exceptions in transferring 
bytes to the buffer", e);
         }
+        return method
+            .getRequestMarshaller()
+            .parse(new ByteBufInputStream(buffer, true));
     }
 
     private <ReqT, TableReqT extends MessageLite> ReqT interceptMessage(
@@ -237,7 +248,9 @@ public class RoutingHeaderClientInterceptor implements 
ClientInterceptor {
         } else {
             try {
                 return interceptTableRequest(method, descriptor, message, sid, 
rid, rk);
-            } catch (IOException ioe) {
+            } catch (Throwable t) {
+                log.error("Failed to intercept table request (sid = {}, rid = 
{}, rk = {}) : ",
+                    sid, rid, Hex.encodeHexString(rk), t);
                 return message;
             }
         }
diff --git 
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/StorageContainerProxyChannelManager.java
 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/StorageContainerProxyChannelManager.java
new file mode 100644
index 0000000..deb054e
--- /dev/null
+++ 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/StorageContainerProxyChannelManager.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.stream.storage.impl.routing;
+
+import io.grpc.Channel;
+
+/**
+ * A manager that manages all the proxy channels to other storage containers.
+ */
+public interface StorageContainerProxyChannelManager {
+
+    /**
+     * Get the channel to storage container <tt>scId</tt>.
+     *
+     * <p>The method can return `null` if the channel is not ready.
+     *
+     * @param scId storage container id
+     * @return channel to the given storage container, or `null` if it can't 
connect
+     */
+    Channel getStorageContainerChannel(long scId);
+
+}
diff --git 
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/StorageContainerProxyChannelManagerImpl.java
 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/StorageContainerProxyChannelManagerImpl.java
new file mode 100644
index 0000000..ed69e0f
--- /dev/null
+++ 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/StorageContainerProxyChannelManagerImpl.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.stream.storage.impl.routing;
+
+import io.grpc.Channel;
+import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
+import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
+import 
org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager;
+
+/**
+ * Default implementation of {@link StorageContainerProxyChannelManager}.
+ */
+public class StorageContainerProxyChannelManagerImpl implements 
StorageContainerProxyChannelManager {
+
+    // we can ideally just talk to the location service directly.
+    // however currently storage container is separated from actual services 
to make a clean interface
+    // so for now proxy channel manager will be acting as a client to talk to 
its local server to
+    // get location related information
+    private final StorageServerClientManager ssClientManager;
+
+    public StorageContainerProxyChannelManagerImpl(StorageServerClientManager 
clientManager) {
+        this.ssClientManager = clientManager;
+    }
+
+    @Override
+    public Channel getStorageContainerChannel(long scId) {
+        StorageContainerChannel channel = 
ssClientManager.getStorageContainerChannel(scId);
+        // this will trigger creating the channel for the first time
+        CompletableFuture<StorageServerChannel> serverChannelFuture = 
channel.getStorageContainerChannelFuture();
+        if (null != serverChannelFuture && serverChannelFuture.isDone()) {
+            StorageServerChannel serverChannel = serverChannelFuture.join();
+            if (serverChannel != null) {
+                return serverChannel.getGrpcChannel();
+            } else {
+                return null;
+            }
+        } else {
+            return null;
+        }
+    }
+
+}
diff --git 
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/package-info.java
 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/package-info.java
new file mode 100644
index 0000000..5d4437c
--- /dev/null
+++ 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Classes related to grpc request routing.
+ */
+package org.apache.bookkeeper.stream.storage.impl.routing;
\ No newline at end of file
diff --git 
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerRegistryImpl.java
 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerRegistryImpl.java
index 48330b7..160d9a8 100644
--- 
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerRegistryImpl.java
+++ 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerRegistryImpl.java
@@ -58,7 +58,12 @@ public class StorageContainerRegistryImpl implements 
StorageContainerRegistry {
 
     @Override
     public StorageContainer getStorageContainer(long storageContainerId) {
-        return containers.getOrDefault(storageContainerId, 
StorageContainer404.of());
+        return getStorageContainer(storageContainerId, 
StorageContainer404.of());
+    }
+
+    @Override
+    public StorageContainer getStorageContainer(long storageContainerId, 
StorageContainer defaultContainer) {
+        return containers.getOrDefault(storageContainerId, defaultContainer);
     }
 
     @Override
diff --git 
a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/TestStorageContainerStoreBuilder.java
 
b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/TestStorageContainerStoreBuilder.java
index 3d3a0ad..59d6a13 100644
--- 
a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/TestStorageContainerStoreBuilder.java
+++ 
b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/TestStorageContainerStoreBuilder.java
@@ -18,6 +18,7 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 
 import java.net.URI;
+import 
org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager;
 import org.apache.bookkeeper.stream.storage.api.StorageContainerStore;
 import 
org.apache.bookkeeper.stream.storage.api.sc.StorageContainerManagerFactory;
 import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
@@ -47,6 +48,7 @@ public class TestStorageContainerStoreBuilder {
             .withStorageResources(StorageResources.create())
             .withRangeStoreFactory(storeFactory)
             .withDefaultBackendUri(uri)
+            .withStorageServerClientManager(() -> 
mock(StorageServerClientManager.class))
             .build();
     }
 
@@ -57,6 +59,7 @@ public class TestStorageContainerStoreBuilder {
             
.withStorageContainerManagerFactory(mock(StorageContainerManagerFactory.class))
             .withStorageResources(null)
             .withRangeStoreFactory(storeFactory)
+            .withStorageServerClientManager(() -> 
mock(StorageServerClientManager.class))
             .withDefaultBackendUri(uri)
             .build();
     }
@@ -68,6 +71,7 @@ public class TestStorageContainerStoreBuilder {
             .withStorageContainerManagerFactory(null)
             .withStorageResources(StorageResources.create())
             .withRangeStoreFactory(storeFactory)
+            .withStorageServerClientManager(() -> 
mock(StorageServerClientManager.class))
             .withDefaultBackendUri(uri)
             .build();
     }
@@ -79,6 +83,7 @@ public class TestStorageContainerStoreBuilder {
             
.withStorageContainerManagerFactory(mock(StorageContainerManagerFactory.class))
             .withStorageResources(StorageResources.create())
             .withRangeStoreFactory(null)
+            .withStorageServerClientManager(() -> 
mock(StorageServerClientManager.class))
             .withDefaultBackendUri(uri)
             .build();
     }
@@ -90,10 +95,23 @@ public class TestStorageContainerStoreBuilder {
             
.withStorageContainerManagerFactory(mock(StorageContainerManagerFactory.class))
             .withStorageResources(StorageResources.create())
             .withRangeStoreFactory(storeFactory)
+            .withStorageServerClientManager(() -> 
mock(StorageServerClientManager.class))
             .withDefaultBackendUri(null)
             .build();
     }
 
+    @Test(expected = NullPointerException.class)
+    public void testBuildStorageServerClientManager() {
+        StorageContainerStoreBuilder.newBuilder()
+            .withStorageConfiguration(mock(StorageConfiguration.class))
+            
.withStorageContainerManagerFactory(mock(StorageContainerManagerFactory.class))
+            .withStorageResources(StorageResources.create())
+            .withRangeStoreFactory(storeFactory)
+            .withStorageServerClientManager(null)
+            .withDefaultBackendUri(uri)
+            .build();
+    }
+
     @Test
     public void testBuild() {
         StorageContainerStore storageContainerStore = 
StorageContainerStoreBuilder.newBuilder()
@@ -101,6 +119,7 @@ public class TestStorageContainerStoreBuilder {
             
.withStorageContainerManagerFactory(mock(StorageContainerManagerFactory.class))
             .withStorageResources(StorageResources.create())
             .withRangeStoreFactory(storeFactory)
+            .withStorageServerClientManager(() -> 
mock(StorageServerClientManager.class))
             .withDefaultBackendUri(uri)
             .build();
         assertTrue(storageContainerStore instanceof StorageContainerStoreImpl);
diff --git 
a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestStorageContainerStoreImpl.java
 
b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestStorageContainerStoreImpl.java
index b9d5e69..6ae3658 100644
--- 
a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestStorageContainerStoreImpl.java
+++ 
b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestStorageContainerStoreImpl.java
@@ -85,6 +85,7 @@ import 
org.apache.bookkeeper.stream.proto.storage.MetaRangeServiceGrpc.MetaRange
 import org.apache.bookkeeper.stream.proto.storage.RootRangeServiceGrpc;
 import 
org.apache.bookkeeper.stream.proto.storage.RootRangeServiceGrpc.RootRangeServiceFutureStub;
 import org.apache.bookkeeper.stream.proto.storage.StatusCode;
+import org.apache.bookkeeper.stream.storage.StorageResources;
 import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService;
 import 
org.apache.bookkeeper.stream.storage.api.service.RangeStoreServiceFactory;
 import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
@@ -133,6 +134,7 @@ public class TestStorageContainerStoreImpl {
         NamespaceConfiguration.newBuilder()
             .setDefaultStreamConf(DEFAULT_STREAM_CONF)
             .build();
+    private final StorageResources resources = StorageResources.create();
     private RangeStoreService mockRangeStoreService;
     private StorageContainerStoreImpl rangeStore;
     private Server server;
@@ -230,6 +232,7 @@ public class TestStorageContainerStoreImpl {
             (storeConf, rgRegistry)
                 -> new LocalStorageContainerManager(endpoint, storeConf, 
rgRegistry, 2),
             new 
RangeStoreContainerServiceFactoryImpl(rangeStoreServiceFactory),
+            null,
             NullStatsLogger.INSTANCE);
 
         rangeStore.start();
diff --git 
a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTableImplTest.java
 
b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTableImplTest.java
new file mode 100644
index 0000000..8201ba7
--- /dev/null
+++ 
b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTableImplTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.stream.storage.impl.routing;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+import io.grpc.stub.StreamObserver;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.clients.grpc.GrpcClientTestBase;
+import org.apache.bookkeeper.clients.impl.internal.ProtocolInternalUtils;
+import org.apache.bookkeeper.clients.impl.routing.RangeRouter;
+import org.apache.bookkeeper.common.router.BytesHashRouter;
+import org.apache.bookkeeper.stream.proto.RangeProperties;
+import org.apache.bookkeeper.stream.proto.StreamConfiguration;
+import org.apache.bookkeeper.stream.proto.StreamProperties;
+import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesRequest;
+import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesResponse;
+import org.apache.bookkeeper.stream.proto.storage.GetStreamRequest;
+import org.apache.bookkeeper.stream.proto.storage.GetStreamResponse;
+import 
org.apache.bookkeeper.stream.proto.storage.MetaRangeServiceGrpc.MetaRangeServiceImplBase;
+import org.apache.bookkeeper.stream.proto.storage.RelatedRanges;
+import org.apache.bookkeeper.stream.proto.storage.RelationType;
+import 
org.apache.bookkeeper.stream.proto.storage.RootRangeServiceGrpc.RootRangeServiceImplBase;
+import org.apache.bookkeeper.stream.proto.storage.StatusCode;
+import org.apache.bookkeeper.stream.protocol.util.ProtoUtils;
+import 
org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerPlacementPolicyImpl;
+import org.junit.Test;
+
+/**
+ * Unit test {@link RangeRoutingTable}.
+ */
+public class RangeRoutingTableImplTest extends GrpcClientTestBase {
+
+    private final long scId = 1234L;
+    private final long streamId = 123456L;
+    private GetActiveRangesResponse getActiveRangesResponse;
+    private CompletableFuture<GetActiveRangesResponse> responseSupplier;
+    private StreamProperties props;
+    private List<RangeProperties> rangeProps;
+    private RangeRoutingTableImpl routingTable;
+    private RangeRouter<byte[]> rangeRouter;
+
+    @Override
+    protected void doSetup() throws Exception {
+        this.props = StreamProperties.newBuilder()
+            .setStorageContainerId(scId)
+            .setStreamId(streamId)
+            .setStreamName("metaclient-stream")
+            .setStreamConf(StreamConfiguration.newBuilder().build())
+            .build();
+        this.rangeProps = ProtoUtils.split(
+            streamId,
+            24,
+            23456L,
+            StorageContainerPlacementPolicyImpl.of(4)
+        );
+        final GetActiveRangesResponse.Builder getActiveRangesResponseBuilder = 
GetActiveRangesResponse.newBuilder();
+        for (RangeProperties range : rangeProps) {
+            RelatedRanges.Builder rrBuilder = RelatedRanges.newBuilder()
+                .setProps(range)
+                .setType(RelationType.PARENTS)
+                .addAllRelatedRanges(Collections.emptyList());
+            getActiveRangesResponseBuilder.addRanges(rrBuilder);
+        }
+        this.getActiveRangesResponse = getActiveRangesResponseBuilder
+            .setCode(StatusCode.SUCCESS)
+            .build();
+        RootRangeServiceImplBase rootRangeService = new 
RootRangeServiceImplBase() {
+            @Override
+            public void getStream(GetStreamRequest request,
+                                  StreamObserver<GetStreamResponse> 
responseObserver) {
+                responseObserver.onNext(GetStreamResponse.newBuilder()
+                    .setCode(StatusCode.SUCCESS)
+                    .setStreamProps(props)
+                    .build());
+                responseObserver.onCompleted();
+            }
+        };
+        serviceRegistry.addService(rootRangeService);
+
+        this.responseSupplier = new CompletableFuture<>();
+        // register a good meta range service
+        MetaRangeServiceImplBase metaRangeService = new 
MetaRangeServiceImplBase() {
+            @Override
+            public void getActiveRanges(GetActiveRangesRequest request,
+                                        
StreamObserver<GetActiveRangesResponse> responseObserver) {
+                try {
+                    responseObserver.onNext(responseSupplier.get());
+                    responseObserver.onCompleted();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    responseObserver.onError(e);
+                } catch (ExecutionException e) {
+                    responseObserver.onError(e);
+                }
+            }
+        };
+        serviceRegistry.addService(metaRangeService);
+
+        this.routingTable = new RangeRoutingTableImpl(serverManager);
+        this.rangeRouter = new RangeRouter<>(BytesHashRouter.of());
+        
this.rangeRouter.setRanges(ProtocolInternalUtils.createActiveRanges(getActiveRangesResponse));
+    }
+
+    @Override
+    protected void doTeardown() throws Exception {
+    }
+
+    @Test
+    public void testGetRange() throws Exception {
+        String key = "foo";
+        byte[] keyBytes = key.getBytes(UTF_8);
+        RangeProperties rangeProps = routingTable.getRange(streamId, keyBytes);
+        // the first get will return null since there is nothing in
+        assertNull(rangeProps);
+        // the fetch request is outstanding
+        CompletableFuture<RangeRouter<byte[]>> outstandingFetchFuture =
+            routingTable.getOutstandingFetchRequest(streamId);
+        assertNotNull(outstandingFetchFuture);
+        assertFalse(outstandingFetchFuture.isDone());
+
+        // complete the response supplier, so the fetch request can complete 
to update the cache
+        responseSupplier.complete(getActiveRangesResponse);
+
+        // wait until the stuff is cached.
+        while (null == routingTable.getRangeRouter(streamId)) {
+            TimeUnit.MILLISECONDS.sleep(100);
+        }
+
+        // if the router is created, it should return the cached router
+        rangeProps = routingTable.getRange(streamId, keyBytes);
+        assertNotNull(rangeProps);
+        assertEquals(rangeRouter.getRangeProperties(keyBytes), rangeProps);
+    }
+
+    @Test
+    public void testGetRangeException() throws Exception {
+        String key = "foo";
+        byte[] keyBytes = key.getBytes(UTF_8);
+        RangeProperties rangeProps = routingTable.getRange(streamId, keyBytes);
+        // the first get will return null since there is nothing in
+        assertNull(rangeProps);
+        // the fetch request is outstanding
+        CompletableFuture<RangeRouter<byte[]>> outstandingFetchFuture =
+            routingTable.getOutstandingFetchRequest(streamId);
+        assertNotNull(outstandingFetchFuture);
+        assertFalse(outstandingFetchFuture.isDone());
+
+        // complete the response supplier, so the fetch request can complete 
to update the cache
+        responseSupplier.completeExceptionally(new Exception("fetch failed"));
+
+        // wait until the fetch is done.
+        try {
+            outstandingFetchFuture.get();
+            fail("Fetch request should fail");
+        } catch (Exception e) {
+            // expected
+        }
+
+        // once the fetch is done, nothing should be cached and the 
outstanding fetch request should be removed
+        assertNull(routingTable.getRangeRouter(streamId));
+        assertNull(routingTable.getOutstandingFetchRequest(streamId));
+    }
+
+}
diff --git 
a/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/interceptors/RoutingHeaderClientInterceptorTest.java
 
b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/routing/RoutingHeaderProxyInterceptorTest.java
similarity index 94%
rename from 
stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/interceptors/RoutingHeaderClientInterceptorTest.java
rename to 
stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/routing/RoutingHeaderProxyInterceptorTest.java
index 745118b..43c060b 100644
--- 
a/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/interceptors/RoutingHeaderClientInterceptorTest.java
+++ 
b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/routing/RoutingHeaderProxyInterceptorTest.java
@@ -17,9 +17,12 @@
  * under the License.
  */
 
-package org.apache.bookkeeper.clients.impl.kv.interceptors;
+package org.apache.bookkeeper.stream.storage.impl.routing;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static 
org.apache.bookkeeper.stream.protocol.ProtocolConstants.RID_METADATA_KEY;
+import static 
org.apache.bookkeeper.stream.protocol.ProtocolConstants.RK_METADATA_KEY;
+import static 
org.apache.bookkeeper.stream.protocol.ProtocolConstants.SID_METADATA_KEY;
 import static org.junit.Assert.assertEquals;
 
 import com.google.protobuf.ByteString;
@@ -54,10 +57,10 @@ import 
org.apache.bookkeeper.stream.proto.storage.StatusCode;
 import org.junit.Test;
 
 /**
- * Unit test {@link RoutingHeaderClientInterceptor}.
+ * Unit test {@link RoutingHeaderProxyInterceptor}.
  */
 @Slf4j
-public class RoutingHeaderClientInterceptorTest extends GrpcClientTestBase {
+public class RoutingHeaderProxyInterceptorTest extends GrpcClientTestBase {
 
     private final long streamId = 1234L;
     private final long rangeId = 2345L;
@@ -136,11 +139,12 @@ public class RoutingHeaderClientInterceptorTest extends 
GrpcClientTestBase {
         };
         serviceRegistry.addService(tableService.bindService());
 
+
         this.channel = new StorageServerChannel(
             
InProcessChannelBuilder.forName(serverName).directExecutor().build(),
             Optional.empty()
         ).intercept(
-            new RoutingHeaderClientInterceptor(),
+            new RoutingHeaderProxyInterceptor(),
             new ClientInterceptor() {
                 @Override
                 public <ReqT, RespT> ClientCall<ReqT, RespT> 
interceptCall(MethodDescriptor<ReqT, RespT> method,
@@ -152,15 +156,15 @@ public class RoutingHeaderClientInterceptorTest extends 
GrpcClientTestBase {
                             log.info("Intercept the request with routing 
information : sid = {}, rid = {}, rk = {}",
                                 streamId, rangeId, new String(routingKey, 
UTF_8));
                             headers.put(
-                                
RoutingHeaderClientInterceptor.RID_METADATA_KEY,
+                                RID_METADATA_KEY,
                                 rangeId
                             );
                             headers.put(
-                                
RoutingHeaderClientInterceptor.SID_METADATA_KEY,
+                                SID_METADATA_KEY,
                                 streamId
                             );
                             headers.put(
-                                RoutingHeaderClientInterceptor.RK_METADATA_KEY,
+                                RK_METADATA_KEY,
                                 routingKey
                             );
                             delegate().start(responseListener, headers);
@@ -173,6 +177,7 @@ public class RoutingHeaderClientInterceptorTest extends 
GrpcClientTestBase {
 
     @Override
     protected void doTeardown() {
+        channel.close();
     }
 
     @Test
diff --git 
a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/routing/StorageContainerProxyChannelManagerImplTest.java
 
b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/routing/StorageContainerProxyChannelManagerImplTest.java
new file mode 100644
index 0000000..bb94843
--- /dev/null
+++ 
b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/routing/StorageContainerProxyChannelManagerImplTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.stream.storage.impl.routing;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import io.grpc.Channel;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.clients.grpc.GrpcClientTestBase;
+import 
org.apache.bookkeeper.stream.proto.storage.GetStorageContainerEndpointRequest;
+import 
org.apache.bookkeeper.stream.proto.storage.GetStorageContainerEndpointResponse;
+import 
org.apache.bookkeeper.stream.proto.storage.OneStorageContainerEndpointRequest;
+import 
org.apache.bookkeeper.stream.proto.storage.OneStorageContainerEndpointResponse;
+import org.apache.bookkeeper.stream.proto.storage.StatusCode;
+import org.apache.bookkeeper.stream.proto.storage.StorageContainerEndpoint;
+import 
org.apache.bookkeeper.stream.proto.storage.StorageContainerServiceGrpc.StorageContainerServiceImplBase;
+import org.junit.Test;
+
+/**
+ * Unit testing {@link StorageContainerProxyChannelManagerImpl}.
+ */
+public class StorageContainerProxyChannelManagerImplTest extends 
GrpcClientTestBase {
+
+    private final long scId = 1234L;
+    private StorageContainerProxyChannelManagerImpl proxyChannelManager;
+
+    @Override
+    protected void doSetup() throws Exception {
+        this.proxyChannelManager = new 
StorageContainerProxyChannelManagerImpl(serverManager);
+    }
+
+    @Override
+    protected void doTeardown() throws Exception {
+    }
+
+
+    @Test
+    public void testGetStorageContainerChannel() throws Exception {
+        final CompletableFuture<GetStorageContainerEndpointRequest> 
receivedRequest = new CompletableFuture<>();
+        final CompletableFuture<GetStorageContainerEndpointResponse> 
responseSupplier = new CompletableFuture<>();
+        StorageContainerServiceImplBase scService = new 
StorageContainerServiceImplBase() {
+            @Override
+            public void getStorageContainerEndpoint(
+                    GetStorageContainerEndpointRequest request,
+                    StreamObserver<GetStorageContainerEndpointResponse> 
responseObserver) {
+                receivedRequest.complete(request);
+                try {
+                    responseObserver.onNext(responseSupplier.get());
+                    responseObserver.onCompleted();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    responseObserver.onError(e);
+                } catch (ExecutionException e) {
+                    responseObserver.onError(e);
+                }
+            }
+        };
+        serviceRegistry.addService(scService.bindService());
+
+        Channel channel = proxyChannelManager.getStorageContainerChannel(scId);
+        // if the location service doesn't respond, the channel will be null
+        assertNull(channel);
+
+        // complete the location service request
+        responseSupplier.complete(getResponse(receivedRequest.get()));
+        while ((channel = 
proxyChannelManager.getStorageContainerChannel(scId)) == null) {
+            TimeUnit.MILLISECONDS.sleep(100);
+        }
+        assertNotNull(channel);
+    }
+
+    private static GetStorageContainerEndpointResponse 
getResponse(GetStorageContainerEndpointRequest request) {
+        GetStorageContainerEndpointResponse.Builder respBuilder =
+            GetStorageContainerEndpointResponse.newBuilder();
+        respBuilder.setStatusCode(StatusCode.SUCCESS);
+        for (OneStorageContainerEndpointRequest oneReq : 
request.getRequestsList()) {
+            OneStorageContainerEndpointResponse oneResp = 
OneStorageContainerEndpointResponse.newBuilder()
+                .setEndpoint(StorageContainerEndpoint.newBuilder()
+                    .setStorageContainerId(oneReq.getStorageContainer())
+                    .setRevision(oneReq.getRevision() + 1)
+                    .setRwEndpoint(ENDPOINT))
+                .build();
+            respBuilder.addResponses(oneResp);
+        }
+        return respBuilder.build();
+    }
+
+
+
+}

Reply via email to