This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push: new 3446c2c [TABLE SERVICE] [STORAGE] add routing table for proxying table service requests 3446c2c is described below commit 3446c2cdb9c49ab5f547657a5d12e92022b56721 Author: Sijie Guo <guosi...@gmail.com> AuthorDate: Mon Oct 1 11:12:05 2018 -0700 [TABLE SERVICE] [STORAGE] add routing table for proxying table service requests Descriptions of the changes in this PR: *Motivation* In order to implement non-java clients and avoid the complexity in the client implementation. We need to proxy a routing table in the server side for proxying table service requests. *Changes* Add routing table in the service side to proxy grpc requests to the right storage containers. Author: Sijie Guo <si...@apache.org> Reviewers: Jia Zhai <None>, Enrico Olivelli <eolive...@gmail.com> This closes #1721 from sijie/add_routing_table --- .../impl/internal/ProtocolInternalUtils.java | 2 +- .../stream/protocol/ProtocolConstants.java | 22 +++ .../bookkeeper/stream/server/StorageServer.java | 18 +- .../storage/api/sc/StorageContainerRegistry.java | 9 + stream/storage/impl/pom.xml | 7 + .../storage/StorageContainerStoreBuilder.java | 17 ++ .../storage/impl/StorageContainerStoreImpl.java | 96 +++++++++-- .../storage/impl/routing/RangeRoutingTable.java | 38 ++++ .../impl/routing/RangeRoutingTableImpl.java | 100 +++++++++++ .../routing/RoutingHeaderProxyInterceptor.java} | 133 +++++++------- .../StorageContainerProxyChannelManager.java | 39 +++++ .../StorageContainerProxyChannelManagerImpl.java | 60 +++++++ .../stream/storage/impl/routing/package-info.java | 23 +++ .../impl/sc/StorageContainerRegistryImpl.java | 7 +- .../storage/TestStorageContainerStoreBuilder.java | 19 ++ .../impl/TestStorageContainerStoreImpl.java | 3 + .../impl/routing/RangeRoutingTableImplTest.java | 191 +++++++++++++++++++++ .../RoutingHeaderProxyInterceptorTest.java} | 19 +- ...torageContainerProxyChannelManagerImplTest.java | 111 ++++++++++++ 19 files changed, 832 insertions(+), 82 deletions(-) diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/ProtocolInternalUtils.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/ProtocolInternalUtils.java index a89aaf7..abc9c6d 100644 --- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/ProtocolInternalUtils.java +++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/ProtocolInternalUtils.java @@ -49,7 +49,7 @@ public final class ProtocolInternalUtils { private ProtocolInternalUtils() { } - static HashStreamRanges createActiveRanges(GetActiveRangesResponse response) { + public static HashStreamRanges createActiveRanges(GetActiveRangesResponse response) { TreeMap<Long, RangeProperties> ranges = Maps.newTreeMap(); long lastEndKey = Long.MIN_VALUE; for (RelatedRanges rr : response.getRangesList()) { diff --git a/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/ProtocolConstants.java b/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/ProtocolConstants.java index 0088c4b..b888b45 100644 --- a/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/ProtocolConstants.java +++ b/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/ProtocolConstants.java @@ -19,6 +19,8 @@ package org.apache.bookkeeper.stream.protocol; import io.grpc.Metadata; +import org.apache.bookkeeper.common.grpc.netty.IdentityBinaryMarshaller; +import org.apache.bookkeeper.common.grpc.netty.LongBinaryMarshaller; import org.apache.bookkeeper.stream.proto.FixedRangeSplitPolicy; import org.apache.bookkeeper.stream.proto.RangeKeyType; import org.apache.bookkeeper.stream.proto.RetentionPolicy; @@ -115,4 +117,24 @@ public final class ProtocolConstants { public static final String ROUTING_KEY = "rk" + Metadata.BINARY_HEADER_SUFFIX; public static final String STREAM_ID_KEY = "sid-" + Metadata.BINARY_HEADER_SUFFIX; public static final String RANGE_ID_KEY = "rid-" + Metadata.BINARY_HEADER_SUFFIX; + + // the metadata keys in grpc call metadata + public static final Metadata.Key<Long> SCID_METADATA_KEY = Metadata.Key.of( + SC_ID_KEY, + LongBinaryMarshaller.of() + ); + public static final Metadata.Key<Long> RID_METADATA_KEY = Metadata.Key.of( + RANGE_ID_KEY, + LongBinaryMarshaller.of() + ); + public static final Metadata.Key<Long> SID_METADATA_KEY = Metadata.Key.of( + STREAM_ID_KEY, + LongBinaryMarshaller.of() + ); + public static final Metadata.Key<byte[]> RK_METADATA_KEY = Metadata.Key.of( + ROUTING_KEY, + IdentityBinaryMarshaller.of() + ); + + } diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java index 97ff66a..4385d56 100644 --- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java +++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java @@ -25,6 +25,9 @@ import java.net.UnknownHostException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.clients.config.StorageClientSettings; +import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel; +import org.apache.bookkeeper.clients.impl.internal.StorageServerClientManagerImpl; import org.apache.bookkeeper.common.component.ComponentStarter; import org.apache.bookkeeper.common.component.LifecycleComponent; import org.apache.bookkeeper.common.component.LifecycleComponentStack; @@ -54,6 +57,7 @@ import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration; import org.apache.bookkeeper.stream.storage.impl.cluster.ClusterControllerImpl; import org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterControllerLeaderSelector; import org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterMetadataStore; +import org.apache.bookkeeper.stream.storage.impl.routing.RoutingHeaderProxyInterceptor; import org.apache.bookkeeper.stream.storage.impl.sc.DefaultStorageContainerController; import org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerPlacementPolicyImpl; import org.apache.bookkeeper.stream.storage.impl.sc.ZkStorageContainerManager; @@ -249,6 +253,10 @@ public class StorageServer { dlConf, rootStatsLogger.scope("dlog")); + // client settings for the proxy channels + StorageClientSettings proxyClientSettings = StorageClientSettings.newBuilder() + .serviceUri("bk://localhost:" + grpcPort) + .build(); // Create range (stream) store StorageContainerStoreBuilder storageContainerStoreBuilder = StorageContainerStoreBuilder.newBuilder() .withStatsLogger(rootStatsLogger.scope("storage")) @@ -286,7 +294,15 @@ public class StorageServer { () -> new DLCheckpointStore(dlNamespaceProvider.get()), storageConf.getRangeStoreDirs(), storageResources, - storageConf.getServeReadOnlyTables())); + storageConf.getServeReadOnlyTables())) + // with client manager for proxying grpc requests + .withStorageServerClientManager(() -> new StorageServerClientManagerImpl( + proxyClientSettings, + storageResources.scheduler(), + StorageServerChannel.factory(proxyClientSettings) + // intercept the channel to attach routing header + .andThen(channel -> channel.intercept(new RoutingHeaderProxyInterceptor())) + )); StorageService storageService = new StorageService( storageConf, storageContainerStoreBuilder, rootStatsLogger.scope("storage")); diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerRegistry.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerRegistry.java index 57634d3..a051060 100644 --- a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerRegistry.java +++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/sc/StorageContainerRegistry.java @@ -36,6 +36,15 @@ public interface StorageContainerRegistry extends AutoCloseable { StorageContainer getStorageContainer(long storageContainerId); /** + * Get the instance of storage container {@code storageContainerId}. + * + * @param storageContainerId storage container id + * @param defaultContainer the default container to return if the container doesn't exist in the registry + * @return the instance of the storage container. + */ + StorageContainer getStorageContainer(long storageContainerId, StorageContainer defaultContainer); + + /** * Start the storage container in this registry. * * @param scId storage container id diff --git a/stream/storage/impl/pom.xml b/stream/storage/impl/pom.xml index 52cef7f..a2ce3e0 100644 --- a/stream/storage/impl/pom.xml +++ b/stream/storage/impl/pom.xml @@ -66,6 +66,13 @@ <artifactId>stream-storage-tests-common</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.bookkeeper</groupId> + <artifactId>stream-storage-java-client-base</artifactId> + <version>${project.parent.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> </dependencies> <build> <plugins> diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/StorageContainerStoreBuilder.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/StorageContainerStoreBuilder.java index 9c96139..cac363f 100644 --- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/StorageContainerStoreBuilder.java +++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/StorageContainerStoreBuilder.java @@ -17,6 +17,8 @@ package org.apache.bookkeeper.stream.storage; import static com.google.common.base.Preconditions.checkNotNull; import java.net.URI; +import java.util.function.Supplier; +import org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.stream.protocol.util.StorageContainerPlacementPolicy; @@ -46,6 +48,7 @@ public final class StorageContainerStoreBuilder { StorageContainerPlacementPolicyImpl.of(1024); private MVCCStoreFactory mvccStoreFactory = null; private URI defaultBackendUri = null; + private Supplier<StorageServerClientManager> clientManagerSupplier; private StorageContainerStoreBuilder() { } @@ -131,12 +134,25 @@ public final class StorageContainerStoreBuilder { return this; } + /** + * Supplier to provide client manager for proxying requests. + * + * @param clientManagerSupplier client manager supplier + * @return storage container store builder + */ + public StorageContainerStoreBuilder withStorageServerClientManager( + Supplier<StorageServerClientManager> clientManagerSupplier) { + this.clientManagerSupplier = clientManagerSupplier; + return this; + } + public StorageContainerStore build() { checkNotNull(scmFactory, "StorageContainerManagerFactory is not provided"); checkNotNull(storeConf, "StorageConfiguration is not provided"); checkNotNull(mvccStoreFactory, "MVCCStoreFactory is not provided"); checkNotNull(defaultBackendUri, "Default backend uri is not provided"); checkNotNull(placementPolicyFactory, "Storage Container Placement Policy Factory is not provided"); + checkNotNull(clientManagerSupplier, "Storage server client manager is not provided"); RangeStoreServiceFactoryImpl serviceFactory = new RangeStoreServiceFactoryImpl( storeConf, @@ -152,6 +168,7 @@ public final class StorageContainerStoreBuilder { storeConf, scmFactory, containerServiceFactory, + clientManagerSupplier.get(), statsLogger); } diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/StorageContainerStoreImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/StorageContainerStoreImpl.java index 56fcba2..1271e85 100644 --- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/StorageContainerStoreImpl.java +++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/StorageContainerStoreImpl.java @@ -15,15 +15,19 @@ package org.apache.bookkeeper.stream.storage.impl; import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.INVALID_STORAGE_CONTAINER_ID; -import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.SC_ID_KEY; +import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.RID_METADATA_KEY; +import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.RK_METADATA_KEY; +import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.SCID_METADATA_KEY; +import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.SID_METADATA_KEY; import io.grpc.Channel; import io.grpc.Metadata; import io.grpc.ServerCall; import java.io.IOException; +import org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager; import org.apache.bookkeeper.common.component.AbstractLifecycleComponent; -import org.apache.bookkeeper.common.grpc.netty.LongBinaryMarshaller; import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.bookkeeper.stream.proto.RangeProperties; import org.apache.bookkeeper.stream.storage.api.StorageContainerStore; import org.apache.bookkeeper.stream.storage.api.sc.StorageContainer; import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerManager; @@ -31,11 +35,15 @@ import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerManagerFactor import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerRoutingService; import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerServiceFactory; import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration; +import org.apache.bookkeeper.stream.storage.impl.routing.RangeRoutingTable; +import org.apache.bookkeeper.stream.storage.impl.routing.RangeRoutingTableImpl; +import org.apache.bookkeeper.stream.storage.impl.routing.StorageContainerProxyChannelManager; +import org.apache.bookkeeper.stream.storage.impl.routing.StorageContainerProxyChannelManagerImpl; import org.apache.bookkeeper.stream.storage.impl.sc.DefaultStorageContainerFactory; import org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerRegistryImpl; /** - * KeyRange Service. + * Storage Container Store manages the containers and routes the requests accordingly. */ public class StorageContainerStoreImpl extends AbstractLifecycleComponent<StorageConfiguration> @@ -45,11 +53,14 @@ public class StorageContainerStoreImpl private final StorageContainerRegistryImpl scRegistry; private final StorageContainerManager scManager; private final StorageContainerServiceFactory serviceFactory; - private final Metadata.Key<Long> scIdKey; + private final StorageServerClientManager ssClientManager; + private final RangeRoutingTable routingTable; + private final StorageContainerProxyChannelManager proxyChannelManager; public StorageContainerStoreImpl(StorageConfiguration conf, StorageContainerManagerFactory managerFactory, StorageContainerServiceFactory serviceFactory, + StorageServerClientManager ssClientManager, StatsLogger statsLogger) { super("range-service", conf, statsLogger); this.scmFactory = managerFactory; @@ -57,9 +68,14 @@ public class StorageContainerStoreImpl new DefaultStorageContainerFactory(serviceFactory)); this.scManager = scmFactory.create(conf, scRegistry); this.serviceFactory = serviceFactory; - this.scIdKey = Metadata.Key.of( - SC_ID_KEY, - LongBinaryMarshaller.of()); + this.ssClientManager = ssClientManager; + if (ssClientManager == null) { + this.proxyChannelManager = null; + this.routingTable = null; + } else { + this.proxyChannelManager = new StorageContainerProxyChannelManagerImpl(ssClientManager); + this.routingTable = new RangeRoutingTableImpl(ssClientManager); + } } @Override @@ -83,6 +99,10 @@ public class StorageContainerStoreImpl @Override protected void doStop() { + // it doesn't have to be blocked at waiting closing proxy channels to be completed. + if (null != ssClientManager) { + this.ssClientManager.closeAsync(); + } this.scManager.stop(); this.scRegistry.close(); } @@ -97,17 +117,69 @@ public class StorageContainerStoreImpl return scRegistry.getStorageContainer(scId); } + StorageContainer getStorageContainer(long scId, StorageContainer defaultContainer) { + return scRegistry.getStorageContainer(scId, defaultContainer); + } + // // Utils for proxies // @Override public Channel findChannel(ServerCall<?, ?> serverCall, Metadata headers) { - Long scId = headers.get(scIdKey); - if (null == scId) { - // use the invalid storage container id, so it will fail the request. - scId = INVALID_STORAGE_CONTAINER_ID; + Long scId = headers.get(SCID_METADATA_KEY); + if (null != scId) { + // if a request is sent directly to a container, then find the container + StorageContainer container = getStorageContainer(scId, null); + if (null != container) { + return container.getChannel(); + } else { + if (scId == 0L && null != proxyChannelManager) { + // root container, we attempt to always proxy the requests for root container + Channel channel = proxyChannelManager.getStorageContainerChannel(0L); + if (null != channel) { + return channel; + } else { + // no channel found to proxy the request, fail the request with 404 container + return getStorageContainer(INVALID_STORAGE_CONTAINER_ID).getChannel(); + } + } else { + // no container is found and the scId is not the root container + // then fail the request with 404 container + return getStorageContainer(INVALID_STORAGE_CONTAINER_ID).getChannel(); + } + } + } else { + // if a request is not sent directly to a container, then check if + // streamId + routingKey is attached in the header. if so, figure out + // which the range id and storage container id to route the request. + byte[] routingKey = headers.get(RK_METADATA_KEY); + Long streamId = headers.get(SID_METADATA_KEY); + if (null != routingKey && null != streamId && null != routingTable && null != proxyChannelManager) { + RangeProperties rangeProps = routingTable.getRange(streamId, routingKey); + if (null != rangeProps) { + long containerId = rangeProps.getStorageContainerId(); + long rangeId = rangeProps.getRangeId(); + // if we find the routing information, we can update the headers + headers.put(SCID_METADATA_KEY, containerId); + headers.put(RID_METADATA_KEY, rangeId); + // if we find the container id, we can check whether the container is owned locally. + // if so, forward the request to the container. + StorageContainer container = getStorageContainer(containerId, null); + if (null == container) { + // the container doesn't belong to here, then find the channel to forward the request + Channel channel = proxyChannelManager.getStorageContainerChannel(containerId); + if (null != channel) { + return channel; + } + } else { + // we found the container exists in the registry to serve the request + return container.getChannel(); + } + } + } + // there is no storage container id or routing key, then fail the request and ask client to retry. + return getStorageContainer(INVALID_STORAGE_CONTAINER_ID).getChannel(); } - return getStorageContainer(scId).getChannel(); } } diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTable.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTable.java new file mode 100644 index 0000000..bb80142 --- /dev/null +++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTable.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.bookkeeper.stream.storage.impl.routing; + +import org.apache.bookkeeper.stream.proto.RangeProperties; + +/** + * A routing table for figuring the ranges to forward the requests. + */ +public interface RangeRoutingTable { + + /** + * Get the range metadata for <tt>streamId</tt> and the routing key <tt>routingKey</tt>. + * + * @param streamId stream id + * @param routingKey routing key + * @return the range that serves this routing key + */ + RangeProperties getRange(long streamId, byte[] routingKey); + +} diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTableImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTableImpl.java new file mode 100644 index 0000000..b9727c9 --- /dev/null +++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTableImpl.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.bookkeeper.stream.storage.impl.routing; + +import com.google.common.annotations.VisibleForTesting; +import java.util.concurrent.CompletableFuture; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager; +import org.apache.bookkeeper.clients.impl.routing.RangeRouter; +import org.apache.bookkeeper.common.concurrent.FutureUtils; +import org.apache.bookkeeper.common.router.BytesHashRouter; +import org.apache.bookkeeper.stream.proto.RangeProperties; +import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap; + +/** + * A default implementation of {@link RangeRoutingTable}. + */ +@Slf4j +public class RangeRoutingTableImpl implements RangeRoutingTable { + + private final StorageServerClientManager manager; + private final ConcurrentLongHashMap<RangeRouter<byte[]>> ranges; + + // outstanding requests + private final ConcurrentLongHashMap<CompletableFuture<RangeRouter<byte[]>>> outstandingUpdates; + + public RangeRoutingTableImpl(StorageServerClientManager manager) { + this.manager = manager; + this.ranges = new ConcurrentLongHashMap<>(); + this.outstandingUpdates = new ConcurrentLongHashMap<>(); + } + + @VisibleForTesting + RangeRouter<byte[]> getRangeRouter(long streamId) { + return ranges.get(streamId); + } + + @Override + public RangeProperties getRange(long streamId, byte[] routingKey) { + RangeRouter<byte[]> router = ranges.get(streamId); + if (null == router) { + // trigger to fetch stream metadata, but return `null` since + // the range router is not ready, let the client backoff and retry. + fetchStreamRanges(streamId); + return null; + } else { + return router.getRangeProperties(routingKey); + } + } + + @VisibleForTesting + CompletableFuture<RangeRouter<byte[]>> getOutstandingFetchRequest(long streamId) { + return outstandingUpdates.get(streamId); + } + + private void fetchStreamRanges(long streamId) { + if (null != outstandingUpdates.get(streamId)) { + // there is already an outstanding fetch request, do nothing + return; + } + final CompletableFuture<RangeRouter<byte[]>> newFetchFuture = new CompletableFuture<>(); + if (null != outstandingUpdates.put(streamId, newFetchFuture)) { + // some one already triggers the fetch + return; + } + FutureUtils.proxyTo( + manager.openMetaRangeClient(streamId) + .thenCompose(metaRangeClient -> metaRangeClient.getActiveDataRanges()) + .thenApply(hashStreamRanges -> { + RangeRouter<byte[]> router = new RangeRouter<>(BytesHashRouter.of()); + router.setRanges(hashStreamRanges); + return router; + }) + .whenComplete((router, cause) -> { + if (null == cause) { + ranges.put(streamId, router); + } + outstandingUpdates.remove(streamId, newFetchFuture); + }), + newFetchFuture + ); + } +} diff --git a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/interceptors/RoutingHeaderClientInterceptor.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RoutingHeaderProxyInterceptor.java similarity index 70% rename from stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/interceptors/RoutingHeaderClientInterceptor.java rename to stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RoutingHeaderProxyInterceptor.java index 2ef970f..377f6df 100644 --- a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/interceptors/RoutingHeaderClientInterceptor.java +++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/RoutingHeaderProxyInterceptor.java @@ -17,11 +17,11 @@ * under the License. */ -package org.apache.bookkeeper.clients.impl.kv.interceptors; +package org.apache.bookkeeper.stream.storage.impl.routing; -import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.RANGE_ID_KEY; -import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.ROUTING_KEY; -import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.STREAM_ID_KEY; +import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.RID_METADATA_KEY; +import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.RK_METADATA_KEY; +import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.SID_METADATA_KEY; import com.google.protobuf.CodedOutputStream; import com.google.protobuf.MessageLite; @@ -34,6 +34,9 @@ import io.grpc.ClientInterceptor; import io.grpc.ForwardingClientCall.SimpleForwardingClientCall; import io.grpc.Metadata; import io.grpc.MethodDescriptor; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufInputStream; +import io.netty.buffer.PooledByteBufAllocator; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; @@ -41,8 +44,6 @@ import java.util.HashMap; import java.util.Map; import lombok.Data; import lombok.extern.slf4j.Slf4j; -import org.apache.bookkeeper.common.grpc.netty.IdentityBinaryMarshaller; -import org.apache.bookkeeper.common.grpc.netty.LongBinaryMarshaller; import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeRequest; import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementRequest; import org.apache.bookkeeper.stream.proto.kv.rpc.PutRequest; @@ -50,25 +51,13 @@ import org.apache.bookkeeper.stream.proto.kv.rpc.RangeRequest; import org.apache.bookkeeper.stream.proto.kv.rpc.RoutingHeader; import org.apache.bookkeeper.stream.proto.kv.rpc.TableServiceGrpc; import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest; +import org.apache.commons.codec.binary.Hex; /** * A client interceptor that intercepting kv rpcs to attach routing information. */ @Slf4j -public class RoutingHeaderClientInterceptor implements ClientInterceptor { - - static final Metadata.Key<Long> RID_METADATA_KEY = Metadata.Key.of( - RANGE_ID_KEY, - LongBinaryMarshaller.of() - ); - static final Metadata.Key<Long> SID_METADATA_KEY = Metadata.Key.of( - STREAM_ID_KEY, - LongBinaryMarshaller.of() - ); - static final Metadata.Key<byte[]> RK_METADATA_KEY = Metadata.Key.of( - ROUTING_KEY, - IdentityBinaryMarshaller.of() - ); +public class RoutingHeaderProxyInterceptor implements ClientInterceptor { /** * Table request mutator that mutates a table service rpc request to attach @@ -175,53 +164,75 @@ public class RoutingHeaderClientInterceptor implements ClientInterceptor { CallOptions callOptions, Channel next) { if (log.isTraceEnabled()) { - log.trace("Intercepting method {}", method.getFullMethodName()); + log.trace("Intercepting method {} : req marshaller = {}, resp marshaller = {}", + method.getFullMethodName(), + method.getRequestMarshaller(), + method.getResponseMarshaller()); } InterceptorDescriptor<?> descriptor = kvRpcMethods.get(method.getFullMethodName()); - if (null != descriptor) { - return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) { - - private Long rid = null; - private Long sid = null; - private byte[] rk = null; + return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) { - @Override - public void start(Listener<RespT> responseListener, Metadata headers) { - // capture routing information from headers - sid = headers.get(SID_METADATA_KEY); - rid = headers.get(RID_METADATA_KEY); - rk = headers.get(RK_METADATA_KEY); - if (log.isTraceEnabled()) { - log.trace("Intercepting request with header : sid = {}, rid = {}, rk = {}", - sid, rid, rk); - } + private Long rid = null; + private Long sid = null; + private byte[] rk = null; - delegate().start(responseListener, headers); + @Override + public void start(Listener<RespT> responseListener, Metadata headers) { + // capture routing information from headers + sid = headers.get(SID_METADATA_KEY); + rid = headers.get(RID_METADATA_KEY); + rk = headers.get(RK_METADATA_KEY); + if (log.isTraceEnabled()) { + log.trace("Intercepting request with header : sid = {}, rid = {}, rk = {}", + sid, rid, rk); } - @Override - public void sendMessage(ReqT message) { - ReqT interceptedMessage; - if (null == rid || null == sid || null == rk) { - // we don't have enough information to form the new routing header - // so do nothing - interceptedMessage = message; - } else { - interceptedMessage = interceptMessage( - method, - descriptor, - message, - sid, - rid, - rk - ); - } - delegate().sendMessage(interceptedMessage); + delegate().start(responseListener, headers); + } + + @Override + public void sendMessage(ReqT message) { + ReqT interceptedMessage; + if (null == rid || null == sid || null == rk || null == descriptor) { + // we don't have enough information to form the new routing header + // we simply copy the bytes and generate a new message to forward + // the request payload + interceptedMessage = interceptMessage(method, message); + } else { + interceptedMessage = interceptMessage( + method, + descriptor, + message, + sid, + rid, + rk + ); } - }; - } else { - return next.newCall(method, callOptions); + delegate().sendMessage(interceptedMessage); + } + }; + } + + private <ReqT, RespT> ReqT interceptMessage(MethodDescriptor<ReqT, RespT> method, ReqT message) { + InputStream is = method.getRequestMarshaller().stream(message); + int bytes; + try { + bytes = is.available(); + } catch (IOException e) { + log.warn("Encountered exceptions in getting available bytes of message", e); + throw new RuntimeException("Encountered exception in intercepting message", e); + } + ByteBuf buffer = PooledByteBufAllocator.DEFAULT.buffer(); + try { + buffer.writeBytes(is, bytes); + } catch (IOException e) { + log.warn("Encountered exceptions in transferring bytes to the buffer", e); + buffer.release(); + throw new RuntimeException("Encountered exceptions in transferring bytes to the buffer", e); } + return method + .getRequestMarshaller() + .parse(new ByteBufInputStream(buffer, true)); } private <ReqT, TableReqT extends MessageLite> ReqT interceptMessage( @@ -237,7 +248,9 @@ public class RoutingHeaderClientInterceptor implements ClientInterceptor { } else { try { return interceptTableRequest(method, descriptor, message, sid, rid, rk); - } catch (IOException ioe) { + } catch (Throwable t) { + log.error("Failed to intercept table request (sid = {}, rid = {}, rk = {}) : ", + sid, rid, Hex.encodeHexString(rk), t); return message; } } diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/StorageContainerProxyChannelManager.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/StorageContainerProxyChannelManager.java new file mode 100644 index 0000000..deb054e --- /dev/null +++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/StorageContainerProxyChannelManager.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.bookkeeper.stream.storage.impl.routing; + +import io.grpc.Channel; + +/** + * A manager that manages all the proxy channels to other storage containers. + */ +public interface StorageContainerProxyChannelManager { + + /** + * Get the channel to storage container <tt>scId</tt>. + * + * <p>The method can return `null` if the channel is not ready. + * + * @param scId storage container id + * @return channel to the given storage container, or `null` if it can't connect + */ + Channel getStorageContainerChannel(long scId); + +} diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/StorageContainerProxyChannelManagerImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/StorageContainerProxyChannelManagerImpl.java new file mode 100644 index 0000000..ed69e0f --- /dev/null +++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/StorageContainerProxyChannelManagerImpl.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.bookkeeper.stream.storage.impl.routing; + +import io.grpc.Channel; +import java.util.concurrent.CompletableFuture; +import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel; +import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel; +import org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager; + +/** + * Default implementation of {@link StorageContainerProxyChannelManager}. + */ +public class StorageContainerProxyChannelManagerImpl implements StorageContainerProxyChannelManager { + + // we can ideally just talk to the location service directly. + // however currently storage container is separated from actual services to make a clean interface + // so for now proxy channel manager will be acting as a client to talk to its local server to + // get location related information + private final StorageServerClientManager ssClientManager; + + public StorageContainerProxyChannelManagerImpl(StorageServerClientManager clientManager) { + this.ssClientManager = clientManager; + } + + @Override + public Channel getStorageContainerChannel(long scId) { + StorageContainerChannel channel = ssClientManager.getStorageContainerChannel(scId); + // this will trigger creating the channel for the first time + CompletableFuture<StorageServerChannel> serverChannelFuture = channel.getStorageContainerChannelFuture(); + if (null != serverChannelFuture && serverChannelFuture.isDone()) { + StorageServerChannel serverChannel = serverChannelFuture.join(); + if (serverChannel != null) { + return serverChannel.getGrpcChannel(); + } else { + return null; + } + } else { + return null; + } + } + +} diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/package-info.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/package-info.java new file mode 100644 index 0000000..5d4437c --- /dev/null +++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/routing/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Classes related to grpc request routing. + */ +package org.apache.bookkeeper.stream.storage.impl.routing; \ No newline at end of file diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerRegistryImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerRegistryImpl.java index 48330b7..160d9a8 100644 --- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerRegistryImpl.java +++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerRegistryImpl.java @@ -58,7 +58,12 @@ public class StorageContainerRegistryImpl implements StorageContainerRegistry { @Override public StorageContainer getStorageContainer(long storageContainerId) { - return containers.getOrDefault(storageContainerId, StorageContainer404.of()); + return getStorageContainer(storageContainerId, StorageContainer404.of()); + } + + @Override + public StorageContainer getStorageContainer(long storageContainerId, StorageContainer defaultContainer) { + return containers.getOrDefault(storageContainerId, defaultContainer); } @Override diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/TestStorageContainerStoreBuilder.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/TestStorageContainerStoreBuilder.java index 3d3a0ad..59d6a13 100644 --- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/TestStorageContainerStoreBuilder.java +++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/TestStorageContainerStoreBuilder.java @@ -18,6 +18,7 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import java.net.URI; +import org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager; import org.apache.bookkeeper.stream.storage.api.StorageContainerStore; import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerManagerFactory; import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration; @@ -47,6 +48,7 @@ public class TestStorageContainerStoreBuilder { .withStorageResources(StorageResources.create()) .withRangeStoreFactory(storeFactory) .withDefaultBackendUri(uri) + .withStorageServerClientManager(() -> mock(StorageServerClientManager.class)) .build(); } @@ -57,6 +59,7 @@ public class TestStorageContainerStoreBuilder { .withStorageContainerManagerFactory(mock(StorageContainerManagerFactory.class)) .withStorageResources(null) .withRangeStoreFactory(storeFactory) + .withStorageServerClientManager(() -> mock(StorageServerClientManager.class)) .withDefaultBackendUri(uri) .build(); } @@ -68,6 +71,7 @@ public class TestStorageContainerStoreBuilder { .withStorageContainerManagerFactory(null) .withStorageResources(StorageResources.create()) .withRangeStoreFactory(storeFactory) + .withStorageServerClientManager(() -> mock(StorageServerClientManager.class)) .withDefaultBackendUri(uri) .build(); } @@ -79,6 +83,7 @@ public class TestStorageContainerStoreBuilder { .withStorageContainerManagerFactory(mock(StorageContainerManagerFactory.class)) .withStorageResources(StorageResources.create()) .withRangeStoreFactory(null) + .withStorageServerClientManager(() -> mock(StorageServerClientManager.class)) .withDefaultBackendUri(uri) .build(); } @@ -90,10 +95,23 @@ public class TestStorageContainerStoreBuilder { .withStorageContainerManagerFactory(mock(StorageContainerManagerFactory.class)) .withStorageResources(StorageResources.create()) .withRangeStoreFactory(storeFactory) + .withStorageServerClientManager(() -> mock(StorageServerClientManager.class)) .withDefaultBackendUri(null) .build(); } + @Test(expected = NullPointerException.class) + public void testBuildStorageServerClientManager() { + StorageContainerStoreBuilder.newBuilder() + .withStorageConfiguration(mock(StorageConfiguration.class)) + .withStorageContainerManagerFactory(mock(StorageContainerManagerFactory.class)) + .withStorageResources(StorageResources.create()) + .withRangeStoreFactory(storeFactory) + .withStorageServerClientManager(null) + .withDefaultBackendUri(uri) + .build(); + } + @Test public void testBuild() { StorageContainerStore storageContainerStore = StorageContainerStoreBuilder.newBuilder() @@ -101,6 +119,7 @@ public class TestStorageContainerStoreBuilder { .withStorageContainerManagerFactory(mock(StorageContainerManagerFactory.class)) .withStorageResources(StorageResources.create()) .withRangeStoreFactory(storeFactory) + .withStorageServerClientManager(() -> mock(StorageServerClientManager.class)) .withDefaultBackendUri(uri) .build(); assertTrue(storageContainerStore instanceof StorageContainerStoreImpl); diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestStorageContainerStoreImpl.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestStorageContainerStoreImpl.java index b9d5e69..6ae3658 100644 --- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestStorageContainerStoreImpl.java +++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestStorageContainerStoreImpl.java @@ -85,6 +85,7 @@ import org.apache.bookkeeper.stream.proto.storage.MetaRangeServiceGrpc.MetaRange import org.apache.bookkeeper.stream.proto.storage.RootRangeServiceGrpc; import org.apache.bookkeeper.stream.proto.storage.RootRangeServiceGrpc.RootRangeServiceFutureStub; import org.apache.bookkeeper.stream.proto.storage.StatusCode; +import org.apache.bookkeeper.stream.storage.StorageResources; import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService; import org.apache.bookkeeper.stream.storage.api.service.RangeStoreServiceFactory; import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration; @@ -133,6 +134,7 @@ public class TestStorageContainerStoreImpl { NamespaceConfiguration.newBuilder() .setDefaultStreamConf(DEFAULT_STREAM_CONF) .build(); + private final StorageResources resources = StorageResources.create(); private RangeStoreService mockRangeStoreService; private StorageContainerStoreImpl rangeStore; private Server server; @@ -230,6 +232,7 @@ public class TestStorageContainerStoreImpl { (storeConf, rgRegistry) -> new LocalStorageContainerManager(endpoint, storeConf, rgRegistry, 2), new RangeStoreContainerServiceFactoryImpl(rangeStoreServiceFactory), + null, NullStatsLogger.INSTANCE); rangeStore.start(); diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTableImplTest.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTableImplTest.java new file mode 100644 index 0000000..8201ba7 --- /dev/null +++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/routing/RangeRoutingTableImplTest.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.bookkeeper.stream.storage.impl.routing; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; + +import io.grpc.stub.StreamObserver; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.clients.grpc.GrpcClientTestBase; +import org.apache.bookkeeper.clients.impl.internal.ProtocolInternalUtils; +import org.apache.bookkeeper.clients.impl.routing.RangeRouter; +import org.apache.bookkeeper.common.router.BytesHashRouter; +import org.apache.bookkeeper.stream.proto.RangeProperties; +import org.apache.bookkeeper.stream.proto.StreamConfiguration; +import org.apache.bookkeeper.stream.proto.StreamProperties; +import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesRequest; +import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesResponse; +import org.apache.bookkeeper.stream.proto.storage.GetStreamRequest; +import org.apache.bookkeeper.stream.proto.storage.GetStreamResponse; +import org.apache.bookkeeper.stream.proto.storage.MetaRangeServiceGrpc.MetaRangeServiceImplBase; +import org.apache.bookkeeper.stream.proto.storage.RelatedRanges; +import org.apache.bookkeeper.stream.proto.storage.RelationType; +import org.apache.bookkeeper.stream.proto.storage.RootRangeServiceGrpc.RootRangeServiceImplBase; +import org.apache.bookkeeper.stream.proto.storage.StatusCode; +import org.apache.bookkeeper.stream.protocol.util.ProtoUtils; +import org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerPlacementPolicyImpl; +import org.junit.Test; + +/** + * Unit test {@link RangeRoutingTable}. + */ +public class RangeRoutingTableImplTest extends GrpcClientTestBase { + + private final long scId = 1234L; + private final long streamId = 123456L; + private GetActiveRangesResponse getActiveRangesResponse; + private CompletableFuture<GetActiveRangesResponse> responseSupplier; + private StreamProperties props; + private List<RangeProperties> rangeProps; + private RangeRoutingTableImpl routingTable; + private RangeRouter<byte[]> rangeRouter; + + @Override + protected void doSetup() throws Exception { + this.props = StreamProperties.newBuilder() + .setStorageContainerId(scId) + .setStreamId(streamId) + .setStreamName("metaclient-stream") + .setStreamConf(StreamConfiguration.newBuilder().build()) + .build(); + this.rangeProps = ProtoUtils.split( + streamId, + 24, + 23456L, + StorageContainerPlacementPolicyImpl.of(4) + ); + final GetActiveRangesResponse.Builder getActiveRangesResponseBuilder = GetActiveRangesResponse.newBuilder(); + for (RangeProperties range : rangeProps) { + RelatedRanges.Builder rrBuilder = RelatedRanges.newBuilder() + .setProps(range) + .setType(RelationType.PARENTS) + .addAllRelatedRanges(Collections.emptyList()); + getActiveRangesResponseBuilder.addRanges(rrBuilder); + } + this.getActiveRangesResponse = getActiveRangesResponseBuilder + .setCode(StatusCode.SUCCESS) + .build(); + RootRangeServiceImplBase rootRangeService = new RootRangeServiceImplBase() { + @Override + public void getStream(GetStreamRequest request, + StreamObserver<GetStreamResponse> responseObserver) { + responseObserver.onNext(GetStreamResponse.newBuilder() + .setCode(StatusCode.SUCCESS) + .setStreamProps(props) + .build()); + responseObserver.onCompleted(); + } + }; + serviceRegistry.addService(rootRangeService); + + this.responseSupplier = new CompletableFuture<>(); + // register a good meta range service + MetaRangeServiceImplBase metaRangeService = new MetaRangeServiceImplBase() { + @Override + public void getActiveRanges(GetActiveRangesRequest request, + StreamObserver<GetActiveRangesResponse> responseObserver) { + try { + responseObserver.onNext(responseSupplier.get()); + responseObserver.onCompleted(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + responseObserver.onError(e); + } catch (ExecutionException e) { + responseObserver.onError(e); + } + } + }; + serviceRegistry.addService(metaRangeService); + + this.routingTable = new RangeRoutingTableImpl(serverManager); + this.rangeRouter = new RangeRouter<>(BytesHashRouter.of()); + this.rangeRouter.setRanges(ProtocolInternalUtils.createActiveRanges(getActiveRangesResponse)); + } + + @Override + protected void doTeardown() throws Exception { + } + + @Test + public void testGetRange() throws Exception { + String key = "foo"; + byte[] keyBytes = key.getBytes(UTF_8); + RangeProperties rangeProps = routingTable.getRange(streamId, keyBytes); + // the first get will return null since there is nothing in + assertNull(rangeProps); + // the fetch request is outstanding + CompletableFuture<RangeRouter<byte[]>> outstandingFetchFuture = + routingTable.getOutstandingFetchRequest(streamId); + assertNotNull(outstandingFetchFuture); + assertFalse(outstandingFetchFuture.isDone()); + + // complete the response supplier, so the fetch request can complete to update the cache + responseSupplier.complete(getActiveRangesResponse); + + // wait until the stuff is cached. + while (null == routingTable.getRangeRouter(streamId)) { + TimeUnit.MILLISECONDS.sleep(100); + } + + // if the router is created, it should return the cached router + rangeProps = routingTable.getRange(streamId, keyBytes); + assertNotNull(rangeProps); + assertEquals(rangeRouter.getRangeProperties(keyBytes), rangeProps); + } + + @Test + public void testGetRangeException() throws Exception { + String key = "foo"; + byte[] keyBytes = key.getBytes(UTF_8); + RangeProperties rangeProps = routingTable.getRange(streamId, keyBytes); + // the first get will return null since there is nothing in + assertNull(rangeProps); + // the fetch request is outstanding + CompletableFuture<RangeRouter<byte[]>> outstandingFetchFuture = + routingTable.getOutstandingFetchRequest(streamId); + assertNotNull(outstandingFetchFuture); + assertFalse(outstandingFetchFuture.isDone()); + + // complete the response supplier, so the fetch request can complete to update the cache + responseSupplier.completeExceptionally(new Exception("fetch failed")); + + // wait until the fetch is done. + try { + outstandingFetchFuture.get(); + fail("Fetch request should fail"); + } catch (Exception e) { + // expected + } + + // once the fetch is done, nothing should be cached and the outstanding fetch request should be removed + assertNull(routingTable.getRangeRouter(streamId)); + assertNull(routingTable.getOutstandingFetchRequest(streamId)); + } + +} diff --git a/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/interceptors/RoutingHeaderClientInterceptorTest.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/routing/RoutingHeaderProxyInterceptorTest.java similarity index 94% rename from stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/interceptors/RoutingHeaderClientInterceptorTest.java rename to stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/routing/RoutingHeaderProxyInterceptorTest.java index 745118b..43c060b 100644 --- a/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/interceptors/RoutingHeaderClientInterceptorTest.java +++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/routing/RoutingHeaderProxyInterceptorTest.java @@ -17,9 +17,12 @@ * under the License. */ -package org.apache.bookkeeper.clients.impl.kv.interceptors; +package org.apache.bookkeeper.stream.storage.impl.routing; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.RID_METADATA_KEY; +import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.RK_METADATA_KEY; +import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.SID_METADATA_KEY; import static org.junit.Assert.assertEquals; import com.google.protobuf.ByteString; @@ -54,10 +57,10 @@ import org.apache.bookkeeper.stream.proto.storage.StatusCode; import org.junit.Test; /** - * Unit test {@link RoutingHeaderClientInterceptor}. + * Unit test {@link RoutingHeaderProxyInterceptor}. */ @Slf4j -public class RoutingHeaderClientInterceptorTest extends GrpcClientTestBase { +public class RoutingHeaderProxyInterceptorTest extends GrpcClientTestBase { private final long streamId = 1234L; private final long rangeId = 2345L; @@ -136,11 +139,12 @@ public class RoutingHeaderClientInterceptorTest extends GrpcClientTestBase { }; serviceRegistry.addService(tableService.bindService()); + this.channel = new StorageServerChannel( InProcessChannelBuilder.forName(serverName).directExecutor().build(), Optional.empty() ).intercept( - new RoutingHeaderClientInterceptor(), + new RoutingHeaderProxyInterceptor(), new ClientInterceptor() { @Override public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, @@ -152,15 +156,15 @@ public class RoutingHeaderClientInterceptorTest extends GrpcClientTestBase { log.info("Intercept the request with routing information : sid = {}, rid = {}, rk = {}", streamId, rangeId, new String(routingKey, UTF_8)); headers.put( - RoutingHeaderClientInterceptor.RID_METADATA_KEY, + RID_METADATA_KEY, rangeId ); headers.put( - RoutingHeaderClientInterceptor.SID_METADATA_KEY, + SID_METADATA_KEY, streamId ); headers.put( - RoutingHeaderClientInterceptor.RK_METADATA_KEY, + RK_METADATA_KEY, routingKey ); delegate().start(responseListener, headers); @@ -173,6 +177,7 @@ public class RoutingHeaderClientInterceptorTest extends GrpcClientTestBase { @Override protected void doTeardown() { + channel.close(); } @Test diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/routing/StorageContainerProxyChannelManagerImplTest.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/routing/StorageContainerProxyChannelManagerImplTest.java new file mode 100644 index 0000000..bb94843 --- /dev/null +++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/routing/StorageContainerProxyChannelManagerImplTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.bookkeeper.stream.storage.impl.routing; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import io.grpc.Channel; +import io.grpc.stub.StreamObserver; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.clients.grpc.GrpcClientTestBase; +import org.apache.bookkeeper.stream.proto.storage.GetStorageContainerEndpointRequest; +import org.apache.bookkeeper.stream.proto.storage.GetStorageContainerEndpointResponse; +import org.apache.bookkeeper.stream.proto.storage.OneStorageContainerEndpointRequest; +import org.apache.bookkeeper.stream.proto.storage.OneStorageContainerEndpointResponse; +import org.apache.bookkeeper.stream.proto.storage.StatusCode; +import org.apache.bookkeeper.stream.proto.storage.StorageContainerEndpoint; +import org.apache.bookkeeper.stream.proto.storage.StorageContainerServiceGrpc.StorageContainerServiceImplBase; +import org.junit.Test; + +/** + * Unit testing {@link StorageContainerProxyChannelManagerImpl}. + */ +public class StorageContainerProxyChannelManagerImplTest extends GrpcClientTestBase { + + private final long scId = 1234L; + private StorageContainerProxyChannelManagerImpl proxyChannelManager; + + @Override + protected void doSetup() throws Exception { + this.proxyChannelManager = new StorageContainerProxyChannelManagerImpl(serverManager); + } + + @Override + protected void doTeardown() throws Exception { + } + + + @Test + public void testGetStorageContainerChannel() throws Exception { + final CompletableFuture<GetStorageContainerEndpointRequest> receivedRequest = new CompletableFuture<>(); + final CompletableFuture<GetStorageContainerEndpointResponse> responseSupplier = new CompletableFuture<>(); + StorageContainerServiceImplBase scService = new StorageContainerServiceImplBase() { + @Override + public void getStorageContainerEndpoint( + GetStorageContainerEndpointRequest request, + StreamObserver<GetStorageContainerEndpointResponse> responseObserver) { + receivedRequest.complete(request); + try { + responseObserver.onNext(responseSupplier.get()); + responseObserver.onCompleted(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + responseObserver.onError(e); + } catch (ExecutionException e) { + responseObserver.onError(e); + } + } + }; + serviceRegistry.addService(scService.bindService()); + + Channel channel = proxyChannelManager.getStorageContainerChannel(scId); + // if the location service doesn't respond, the channel will be null + assertNull(channel); + + // complete the location service request + responseSupplier.complete(getResponse(receivedRequest.get())); + while ((channel = proxyChannelManager.getStorageContainerChannel(scId)) == null) { + TimeUnit.MILLISECONDS.sleep(100); + } + assertNotNull(channel); + } + + private static GetStorageContainerEndpointResponse getResponse(GetStorageContainerEndpointRequest request) { + GetStorageContainerEndpointResponse.Builder respBuilder = + GetStorageContainerEndpointResponse.newBuilder(); + respBuilder.setStatusCode(StatusCode.SUCCESS); + for (OneStorageContainerEndpointRequest oneReq : request.getRequestsList()) { + OneStorageContainerEndpointResponse oneResp = OneStorageContainerEndpointResponse.newBuilder() + .setEndpoint(StorageContainerEndpoint.newBuilder() + .setStorageContainerId(oneReq.getStorageContainer()) + .setRevision(oneReq.getRevision() + 1) + .setRwEndpoint(ENDPOINT)) + .build(); + respBuilder.addResponses(oneResp); + } + return respBuilder.build(); + } + + + +}