http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java b/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java deleted file mode 100644 index 6b2a965..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java +++ /dev/null @@ -1,180 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client.handlers; - -import java.net.InetSocketAddress; - - -import org.jboss.netty.channel.Channel; - -import com.google.protobuf.ByteString; - -import org.apache.hedwig.client.api.MessageHandler; -import org.apache.hedwig.client.conf.ClientConfiguration; -import org.apache.hedwig.client.data.TopicSubscriber; -import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException; -import org.apache.hedwig.client.netty.HChannelManager; -import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException; -import org.apache.hedwig.protocol.PubSubProtocol.Message; -import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId; -import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse; -import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent; -import org.apache.hedwig.util.Callback; - -/** - * An interface provided to manage all subscriptions on a channel. - * - * Its responsibility is to handle all subscribe responses received on that channel, - * clear up subscriptions and retry reconnectin subscriptions when channel disconnected, - * and handle delivering messages to {@link MessageHandler} and sent consume messages - * back to hub servers. - */ -public abstract class SubscribeResponseHandler extends AbstractResponseHandler { - - protected SubscribeResponseHandler(ClientConfiguration cfg, - HChannelManager channelManager) { - super(cfg, channelManager); - } - - /** - * Handle Message delivered by the server. - * - * @param response - * Message received from the server. - */ - public abstract void handleSubscribeMessage(PubSubResponse response); - - /** - * Handle a subscription event delivered by the server. - * - * @param topic - * Topic Name - * @param subscriberId - * Subscriber Id - * @param event - * Subscription Event describes its status - */ - public abstract void handleSubscriptionEvent(ByteString topic, - ByteString subscriberId, - SubscriptionEvent event); - - /** - * Method called when a message arrives for a subscribe Channel and we want - * to deliver it asynchronously via the registered MessageHandler (should - * not be null when called here). - * - * @param message - * Message from Subscribe Channel we want to consume. - */ - protected abstract void asyncMessageDeliver(TopicSubscriber topicSubscriber, - Message message); - - /** - * Method called when the client app's MessageHandler has asynchronously - * completed consuming a subscribed message sent from the server. The - * contract with the client app is that messages sent to the handler to be - * consumed will have the callback response done in the same order. So if we - * asynchronously call the MessageHandler to consume messages #1-5, that - * should call the messageConsumed method here via the VoidCallback in the - * same order. To make this thread safe, since multiple outstanding messages - * could be consumed by the client app and then called back to here, make - * this method synchronized. - * - * @param topicSubscriber - * Topic Subscriber - * @param message - * Message sent from server for topic subscription that has been - * consumed by the client. - */ - protected abstract void messageConsumed(TopicSubscriber topicSubscriber, - Message message); - - /** - * Start delivering messages for a given topic subscriber. - * - * @param topicSubscriber - * Topic Subscriber - * @param messageHandler - * MessageHandler to register for this ResponseHandler instance. - * @throws ClientNotSubscribedException - * If the client is not currently subscribed to the topic - * @throws AlreadyStartDeliveryException - * If someone started delivery a message handler before stopping existed one. - */ - public abstract void startDelivery(TopicSubscriber topicSubscriber, - MessageHandler messageHandler) - throws ClientNotSubscribedException, AlreadyStartDeliveryException; - - /** - * Stop delivering messages for a given topic subscriber. - * - * @param topicSubscriber - * Topic Subscriber - * @throws ClientNotSubscribedException - * If the client is not currently subscribed to the topic - */ - public abstract void stopDelivery(TopicSubscriber topicSubscriber) - throws ClientNotSubscribedException; - - /** - * Whether the given topic subscriber subscribed thru this handler. - * - * @param topicSubscriber - * Topic Subscriber - * @return whether the given topic subscriber subscribed thru this handler. - */ - public abstract boolean hasSubscription(TopicSubscriber topicSubscriber); - - /** - * Close subscription from this handler. - * - * @param topicSubscriber - * Topic Subscriber - * @param callback - * Callback when the subscription is closed. - * @param context - * Callback context. - */ - public abstract void asyncCloseSubscription(TopicSubscriber topicSubscriber, - Callback<ResponseBody> callback, - Object context); - - /** - * Consume a given message for given topic subscriber thru this handler. - * - * @param topicSubscriber - * Topic Subscriber - */ - public abstract void consume(TopicSubscriber topicSubscriber, - MessageSeqId messageSeqId); - - /** - * This method is called when the underlying channel is disconnected due to server failure. - * - * The implementation should take the responsibility to clear subscriptions and retry - * reconnecting subscriptions to new hub servers. - * - * @param host - * Host that channel connected to has disconnected. - * @param channel - * Channel connected to. - */ - public abstract void onChannelDisconnected(InetSocketAddress host, - Channel channel); -}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/UnsubscribeResponseHandler.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/UnsubscribeResponseHandler.java b/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/UnsubscribeResponseHandler.java deleted file mode 100644 index 0d3e8a6..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/UnsubscribeResponseHandler.java +++ /dev/null @@ -1,80 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client.handlers; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.jboss.netty.channel.Channel; - -import org.apache.hedwig.client.conf.ClientConfiguration; -import org.apache.hedwig.client.data.PubSubData; -import org.apache.hedwig.client.netty.HChannelManager; -import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException; -import org.apache.hedwig.exceptions.PubSubException.ServiceDownException; -import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse; - -public class UnsubscribeResponseHandler extends AbstractResponseHandler { - - private static final Logger logger = LoggerFactory.getLogger(UnsubscribeResponseHandler.class); - - public UnsubscribeResponseHandler(ClientConfiguration cfg, - HChannelManager channelManager) { - super(cfg, channelManager); - } - - @Override - public void handleResponse(final PubSubResponse response, final PubSubData pubSubData, - final Channel channel) - throws Exception { - switch (response.getStatusCode()) { - case SUCCESS: - // since for unsubscribe request, we close subscription first - // for now, we don't need to do anything now. - pubSubData.getCallback().operationFinished(pubSubData.context, null); - break; - case CLIENT_NOT_SUBSCRIBED: - // For Unsubscribe requests, the server says that the client was - // never subscribed to the topic. - pubSubData.getCallback().operationFailed(pubSubData.context, new ClientNotSubscribedException( - "Client was never subscribed to topic: " + - pubSubData.topic.toStringUtf8() + ", subscriberId: " + - pubSubData.subscriberId.toStringUtf8())); - break; - case SERVICE_DOWN: - // Response was service down failure so just invoke the callback's - // operationFailed method. - pubSubData.getCallback().operationFailed(pubSubData.context, new ServiceDownException( - "Server responded with a SERVICE_DOWN status")); - break; - case NOT_RESPONSIBLE_FOR_TOPIC: - // Redirect response so we'll need to repost the original - // Unsubscribe Request - handleRedirectResponse(response, pubSubData, channel); - break; - default: - // Consider all other status codes as errors, operation failed - // cases. - logger.error("Unexpected error response from server for PubSubResponse: " + response); - pubSubData.getCallback().operationFailed(pubSubData.context, new ServiceDownException( - "Server responded with a status code of: " + - response.getStatusCode())); - break; - } - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/CleanupChannelMap.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/CleanupChannelMap.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/CleanupChannelMap.java deleted file mode 100644 index a6ba2a6..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/CleanupChannelMap.java +++ /dev/null @@ -1,183 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client.netty; - -import java.util.Collection; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class CleanupChannelMap<T> { - - private static final Logger logger = LoggerFactory.getLogger(CleanupChannelMap.class); - - private final ConcurrentHashMap<T, HChannel> channels; - - // Boolean indicating if the channel map is closed or not. - protected boolean closed = false; - protected final ReentrantReadWriteLock closedLock = - new ReentrantReadWriteLock(); - - public CleanupChannelMap() { - channels = new ConcurrentHashMap<T, HChannel>(); - } - - /** - * Add channel to the map. If an old channel has been bound - * to <code>key</code>, the <code>channel</code> would be - * closed immediately and the old channel is returned. Otherwise, - * the <code>channel</code> is put in the map for future usage. - * - * If the channel map has been closed, the channel would be closed - * immediately. - * - * @param key - * Key - * @param channel - * Channel - * @return the channel instance to use. - */ - public HChannel addChannel(T key, HChannel channel) { - this.closedLock.readLock().lock(); - try { - if (closed) { - channel.close(); - return channel; - } - HChannel oldChannel = channels.putIfAbsent(key, channel); - if (null != oldChannel) { - logger.info("Channel for {} already exists, so no need to store it.", key); - channel.close(); - return oldChannel; - } else { - logger.debug("Storing a new channel for {}.", key); - return channel; - } - } finally { - this.closedLock.readLock().unlock(); - } - } - - /** - * Replace channel only if currently mapped to the given <code>oldChannel</code>. - * - * @param key - * Key - * @param oldChannel - * Old Channel - * @param newChannel - * New Channel - * @return true if replaced successfully, otherwise false. - */ - public boolean replaceChannel(T key, HChannel oldChannel, HChannel newChannel) { - this.closedLock.readLock().lock(); - try { - if (closed) { - if (null != oldChannel) oldChannel.close(); - if (null != newChannel) newChannel.close(); - return false; - } - if (null == oldChannel) { - HChannel existedChannel = channels.putIfAbsent(key, newChannel); - if (null != existedChannel) { - logger.info("Channel for {} already exists, so no need to replace it.", key); - newChannel.close(); - return false; - } else { - logger.debug("Storing a new channel for {}.", key); - return true; - } - } else { - if (channels.replace(key, oldChannel, newChannel)) { - logger.debug("Replacd channel {} for {}.", oldChannel, key); - oldChannel.close(); - return true; - } else { - newChannel.close(); - return false; - } - } - } finally { - this.closedLock.readLock().unlock(); - } - } - - /** - * Returns the channel bound with <code>key</code>. - * - * @param key Key - * @return the channel bound with <code>key</code>. - */ - public HChannel getChannel(T key) { - return channels.get(key); - } - - /** - * Remove the channel bound with <code>key</code>. - * - * @param key Key - * @return the channel bound with <code>key</code>, null if no channel - * is bound with <code>key</code>. - */ - public HChannel removeChannel(T key) { - return channels.remove(key); - } - - /** - * Remove the channel bound with <code>key</code>. - * - * @param key Key - * @param channel The channel expected to be bound with <code>key</code>. - * @return true if the channel is removed, false otherwise. - */ - public boolean removeChannel(T key, HChannel channel) { - return channels.remove(key, channel); - } - - /** - * Return the channels in the map. - * - * @return the set of channels. - */ - public Collection<HChannel> getChannels() { - return channels.values(); - } - - /** - * Close the channels map. - */ - public void close() { - closedLock.writeLock().lock(); - try { - if (closed) { - return; - } - closed = true; - } finally { - closedLock.writeLock().unlock(); - } - logger.debug("Closing channels map."); - for (HChannel channel : channels.values()) { - channel.close(true); - } - channels.clear(); - logger.debug("Closed channels map."); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/FilterableMessageHandler.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/FilterableMessageHandler.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/FilterableMessageHandler.java deleted file mode 100644 index 94e0a80..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/FilterableMessageHandler.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client.netty; - -import com.google.protobuf.ByteString; - -import org.apache.hedwig.client.api.MessageHandler; -import org.apache.hedwig.filter.ClientMessageFilter; -import org.apache.hedwig.protocol.PubSubProtocol.Message; -import org.apache.hedwig.util.Callback; - -/** - * Handlers used by a subscription. - */ -public class FilterableMessageHandler implements MessageHandler { - - MessageHandler msgHandler; - ClientMessageFilter msgFilter; - - public FilterableMessageHandler(MessageHandler msgHandler, - ClientMessageFilter msgFilter) { - this.msgHandler = msgHandler; - this.msgFilter = msgFilter; - } - - public boolean hasMessageHandler() { - return null != msgHandler; - } - - public MessageHandler getMessageHandler() { - return msgHandler; - } - - public boolean hasMessageFilter() { - return null != msgFilter; - } - - public ClientMessageFilter getMessageFilter() { - return msgFilter; - } - - @Override - public void deliver(ByteString topic, ByteString subscriberId, Message msg, - Callback<Void> callback, Object context) { - boolean deliver = true; - if (hasMessageFilter()) { - deliver = msgFilter.testMessage(msg); - } - if (deliver) { - msgHandler.deliver(topic, subscriberId, msg, callback, context); - } else { - callback.operationFinished(context, null); - } - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HChannel.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HChannel.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HChannel.java deleted file mode 100644 index 340cec5..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HChannel.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client.netty; - -import org.jboss.netty.channel.Channel; -import org.apache.hedwig.client.data.PubSubData; - -/** - * A wrapper interface over netty {@link Channel} to submit hedwig's - * {@link PubSubData} requests. - */ -public interface HChannel { - - /** - * Submit a pub/sub request. - * - * @param op - * Pub/Sub Request. - */ - public void submitOp(PubSubData op); - - /** - * @return underlying netty channel - */ - public Channel getChannel(); - - /** - * Close the channel without waiting. - */ - public void close(); - - /** - * Close the channel - * - * @param wait - * Whether wait until the channel is closed. - */ - public void close(boolean wait); -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HChannelManager.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HChannelManager.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HChannelManager.java deleted file mode 100644 index 6fae6bb..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HChannelManager.java +++ /dev/null @@ -1,160 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client.netty; - -import java.net.InetSocketAddress; -import java.util.TimerTask; - -import org.apache.hedwig.client.api.MessageHandler; -import org.apache.hedwig.client.data.PubSubData; -import org.apache.hedwig.client.data.TopicSubscriber; -import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException; -import org.apache.hedwig.client.handlers.SubscribeResponseHandler; -import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException; -import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody; -import org.apache.hedwig.util.Callback; - -/** - * A manager manages 1) all channels established to hub servers, - * 2) the actions taken by the topic subscribers. - */ -public interface HChannelManager { - - /** - * Submit a pub/sub request after a given <code>delay</code>. - * - * @param op - * Pub/Sub Request. - * @param delay - * Delay time in ms. - */ - public void submitOpAfterDelay(PubSubData op, long delay); - - /** - * Submit a pub/sub request. - * - * @param pubSubData - * Pub/Sub Request. - */ - public void submitOp(PubSubData pubSubData); - - /** - * Submit a pub/sub request to default server. - * - * @param pubSubData - * Pub/Sub request. - */ - public void submitOpToDefaultServer(PubSubData pubSubData); - - /** - * Submit a pub/sub request to a given host. - * - * @param pubSubData - * Pub/Sub request. - * @param host - * Given host address. - */ - public void redirectToHost(PubSubData pubSubData, InetSocketAddress host); - - /** - * Generate next transaction id for pub/sub request sending thru this manager. - * - * @return next transaction id. - */ - public long nextTxnId(); - - /** - * Schedule a timer task after a given <code>delay</code>. - * - * @param task - * A timer task - * @param delay - * Delay time in ms. - */ - public void schedule(TimerTask task, long delay); - - /** - * Get the subscribe response handler managed the given <code>topicSubscriber</code>. - * - * @param topicSubscriber - * Topic Subscriber - * @return subscribe response handler managed it, otherwise return null. - */ - public SubscribeResponseHandler getSubscribeResponseHandler( - TopicSubscriber topicSubscriber); - - /** - * Start delivering messages for a given topic subscriber. - * - * @param topicSubscriber - * Topic Subscriber - * @param messageHandler - * MessageHandler to register for this ResponseHandler instance. - * @throws ClientNotSubscribedException - * If the client is not currently subscribed to the topic - * @throws AlreadyStartDeliveryException - * If someone started delivery a message handler before stopping existed one. - */ - public void startDelivery(TopicSubscriber topicSubscriber, - MessageHandler messageHandler) - throws ClientNotSubscribedException, AlreadyStartDeliveryException; - - /** - * Stop delivering messages for a given topic subscriber. - * - * @param topicSubscriber - * Topic Subscriber - * @throws ClientNotSubscribedException - * If the client is not currently subscribed to the topic - */ - public void stopDelivery(TopicSubscriber topicSubscriber) - throws ClientNotSubscribedException; - - /** - * Close the subscription of the given <code>topicSubscriber</code>. - * - * @param topicSubscriber - * Topic Subscriber - * @param callback - * Callback - * @param context - * Callback context - */ - public void asyncCloseSubscription(TopicSubscriber topicSubscriber, - Callback<ResponseBody> callback, - Object context); - - /** - * Return the subscription event emitter to emit subscription events. - * - * @return subscription event emitter. - */ - public SubscriptionEventEmitter getSubscriptionEventEmitter(); - - /** - * Is the channel manager closed. - * - * @return true if the channel manager is closed, otherwise return false. - */ - public boolean isClosed(); - - /** - * Close the channel manager. - */ - public void close(); -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java deleted file mode 100644 index 8ae0e82..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java +++ /dev/null @@ -1,128 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client.netty; - -import java.util.concurrent.Executors; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.jboss.netty.channel.ChannelFactory; -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; - -import com.google.common.util.concurrent.ThreadFactoryBuilder; - -import org.apache.hedwig.client.api.Client; -import org.apache.hedwig.client.conf.ClientConfiguration; -import org.apache.hedwig.client.netty.impl.simple.SimpleHChannelManager; -import org.apache.hedwig.client.netty.impl.multiplex.MultiplexHChannelManager; - -/** - * This is a top level Hedwig Client class that encapsulates the common - * functionality needed for both Publish and Subscribe operations. - * - */ -public class HedwigClientImpl implements Client { - - private static final Logger logger = LoggerFactory.getLogger(HedwigClientImpl.class); - - // The Netty socket factory for making connections to the server. - protected final ChannelFactory socketFactory; - // Whether the socket factory is one we created or is owned by whoever - // instantiated us. - protected boolean ownChannelFactory = false; - - // channel manager manages all the channels established by the client - protected final HChannelManager channelManager; - - private HedwigSubscriber sub; - private final HedwigPublisher pub; - private final ClientConfiguration cfg; - - public static Client create(ClientConfiguration cfg) { - return new HedwigClientImpl(cfg); - } - - public static Client create(ClientConfiguration cfg, ChannelFactory socketFactory) { - return new HedwigClientImpl(cfg, socketFactory); - } - - // Base constructor that takes in a Configuration object. - // This will create its own client socket channel factory. - protected HedwigClientImpl(ClientConfiguration cfg) { - this(cfg, new NioClientSocketChannelFactory( - Executors.newCachedThreadPool(new ThreadFactoryBuilder() - .setNameFormat("HedwigClient-NIOBoss-%d").build()), - Executors.newCachedThreadPool(new ThreadFactoryBuilder() - .setNameFormat("HedwigClient-NIOWorker-%d").build()))); - ownChannelFactory = true; - } - - // Constructor that takes in a Configuration object and a ChannelFactory - // that has already been instantiated by the caller. - protected HedwigClientImpl(ClientConfiguration cfg, ChannelFactory socketFactory) { - this.cfg = cfg; - this.socketFactory = socketFactory; - if (cfg.isSubscriptionChannelSharingEnabled()) { - channelManager = new MultiplexHChannelManager(cfg, socketFactory); - } else { - channelManager = new SimpleHChannelManager(cfg, socketFactory); - } - pub = new HedwigPublisher(this); - sub = new HedwigSubscriber(this); - } - - public ClientConfiguration getConfiguration() { - return cfg; - } - - public HChannelManager getHChannelManager() { - return channelManager; - } - - public HedwigSubscriber getSubscriber() { - return sub; - } - - // Protected method to set the subscriber. This is needed currently for hub - // versions of the client subscriber. - protected void setSubscriber(HedwigSubscriber sub) { - this.sub = sub; - } - - public HedwigPublisher getPublisher() { - return pub; - } - - // When we are done with the client, this is a clean way to gracefully close - // all channels/sockets created by the client and to also release all - // resources used by netty. - public void close() { - logger.info("Stopping the client!"); - - // close channel manager to release all channels - channelManager.close(); - - // Release resources used by the ChannelFactory on the client if we are - // the owner that created it. - if (ownChannelFactory) { - socketFactory.releaseExternalResources(); - } - logger.info("Completed stopping the client!"); - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java deleted file mode 100644 index 271d1eb..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java +++ /dev/null @@ -1,151 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client.netty; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.protobuf.ByteString; - -import org.apache.hedwig.client.api.Publisher; -import org.apache.hedwig.client.data.PubSubData; -import org.apache.hedwig.client.handlers.PubSubCallback; -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.exceptions.PubSubException.CouldNotConnectException; -import org.apache.hedwig.exceptions.PubSubException.ServiceDownException; -import org.apache.hedwig.protocol.PubSubProtocol.Message; -import org.apache.hedwig.protocol.PubSubProtocol.OperationType; -import org.apache.hedwig.protocol.PubSubProtocol.PublishResponse; -import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody; -import org.apache.hedwig.util.Callback; - -/** - * This is the Hedwig Netty specific implementation of the Publisher interface. - * - */ -public class HedwigPublisher implements Publisher { - - private static final Logger logger = LoggerFactory.getLogger(HedwigPublisher.class); - - private final HChannelManager channelManager; - - protected HedwigPublisher(HedwigClientImpl client) { - this.channelManager = client.getHChannelManager(); - } - - public PublishResponse publish(ByteString topic, Message msg) - throws CouldNotConnectException, ServiceDownException { - - if (logger.isDebugEnabled()) { - logger.debug("Calling a sync publish for topic: {}, msg: {}.", - topic.toStringUtf8(), msg); - } - PubSubData pubSubData = new PubSubData(topic, msg, null, OperationType.PUBLISH, null, null, null); - synchronized (pubSubData) { - PubSubCallback pubSubCallback = new PubSubCallback(pubSubData); - asyncPublishWithResponseImpl(topic, msg, pubSubCallback, null); - try { - while (!pubSubData.isDone) - pubSubData.wait(); - } catch (InterruptedException e) { - throw new ServiceDownException("Interrupted Exception while waiting for async publish call"); - } - // Check from the PubSubCallback if it was successful or not. - if (!pubSubCallback.getIsCallSuccessful()) { - // See what the exception was that was thrown when the operation - // failed. - PubSubException failureException = pubSubCallback.getFailureException(); - if (failureException == null) { - // This should not happen as the operation failed but a null - // PubSubException was passed. Log a warning message but - // throw a generic ServiceDownException. - logger.error("Sync Publish operation failed but no PubSubException was passed!"); - throw new ServiceDownException("Server ack response to publish request is not successful"); - } - // For the expected exceptions that could occur, just rethrow - // them. - else if (failureException instanceof CouldNotConnectException) { - throw (CouldNotConnectException) failureException; - } else if (failureException instanceof ServiceDownException) { - throw (ServiceDownException) failureException; - } else { - // For other types of PubSubExceptions, just throw a generic - // ServiceDownException but log a warning message. - logger.error("Unexpected exception type when a sync publish operation failed: ", - failureException); - throw new ServiceDownException("Server ack response to publish request is not successful"); - } - } - - ResponseBody respBody = pubSubCallback.getResponseBody(); - if (null == respBody) { - return null; - } - return respBody.hasPublishResponse() ? respBody.getPublishResponse() : null; - } - } - - public void asyncPublish(ByteString topic, Message msg, - final Callback<Void> callback, Object context) { - asyncPublishWithResponseImpl(topic, msg, - new VoidCallbackAdapter<ResponseBody>(callback), context); - } - - public void asyncPublishWithResponse(ByteString topic, Message msg, - Callback<PublishResponse> callback, - Object context) { - // adapt the callback. - asyncPublishWithResponseImpl(topic, msg, - new PublishResponseCallbackAdapter(callback), context); - } - - private void asyncPublishWithResponseImpl(ByteString topic, Message msg, - Callback<ResponseBody> callback, - Object context) { - if (logger.isDebugEnabled()) { - logger.debug("Calling an async publish for topic: {}, msg: {}.", - topic.toStringUtf8(), msg); - } - PubSubData pubSubData = new PubSubData(topic, msg, null, OperationType.PUBLISH, null, - callback, context); - channelManager.submitOp(pubSubData); - } - - private static class PublishResponseCallbackAdapter implements Callback<ResponseBody>{ - - private final Callback<PublishResponse> delegate; - - private PublishResponseCallbackAdapter(Callback<PublishResponse> delegate) { - this.delegate = delegate; - } - - @Override - public void operationFinished(Object ctx, ResponseBody resultOfOperation) { - if (null == resultOfOperation) { - delegate.operationFinished(ctx, null); - } else { - delegate.operationFinished(ctx, resultOfOperation.getPublishResponse()); - } - } - - @Override - public void operationFailed(Object ctx, PubSubException exception) { - delegate.operationFailed(ctx, exception); - } - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java deleted file mode 100644 index 0eda290..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java +++ /dev/null @@ -1,422 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client.netty; - -import java.util.List; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.protobuf.ByteString; - -import org.apache.hedwig.client.api.MessageHandler; -import org.apache.hedwig.client.api.Subscriber; -import org.apache.hedwig.client.conf.ClientConfiguration; -import org.apache.hedwig.client.data.PubSubData; -import org.apache.hedwig.client.data.TopicSubscriber; -import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException; -import org.apache.hedwig.client.exceptions.InvalidSubscriberIdException; -import org.apache.hedwig.client.handlers.PubSubCallback; -import org.apache.hedwig.client.handlers.SubscribeResponseHandler; -import org.apache.hedwig.filter.ClientMessageFilter; -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.exceptions.PubSubException.ClientAlreadySubscribedException; -import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException; -import org.apache.hedwig.exceptions.PubSubException.CouldNotConnectException; -import org.apache.hedwig.exceptions.PubSubException.ServiceDownException; -import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId; -import org.apache.hedwig.protocol.PubSubProtocol.OperationType; -import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions; -import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach; -import org.apache.hedwig.protoextensions.SubscriptionStateUtils; -import org.apache.hedwig.util.Callback; -import org.apache.hedwig.util.SubscriptionListener; - -/** - * This is the Hedwig Netty specific implementation of the Subscriber interface. - * - */ -@SuppressWarnings("deprecation") // so that we can implemented the Deprecated subscribe methods without a warning -public class HedwigSubscriber implements Subscriber { - - private static final Logger logger = LoggerFactory.getLogger(HedwigSubscriber.class); - - protected final ClientConfiguration cfg; - protected final HChannelManager channelManager; - - public HedwigSubscriber(HedwigClientImpl client) { - this.cfg = client.getConfiguration(); - this.channelManager = client.getHChannelManager(); - } - - public void addSubscriptionListener(SubscriptionListener listener) { - channelManager.getSubscriptionEventEmitter() - .addSubscriptionListener(listener); - } - - public void removeSubscriptionListener(SubscriptionListener listener) { - channelManager.getSubscriptionEventEmitter() - .removeSubscriptionListener(listener); - } - - // Private method that holds the common logic for doing synchronous - // Subscribe or Unsubscribe requests. This is for code reuse since these - // two flows are very similar. The assumption is that the input - // OperationType is either SUBSCRIBE or UNSUBSCRIBE. - private void subUnsub(ByteString topic, ByteString subscriberId, OperationType operationType, - SubscriptionOptions options) - throws CouldNotConnectException, ClientAlreadySubscribedException, - ClientNotSubscribedException, ServiceDownException { - if (logger.isDebugEnabled()) { - StringBuilder debugMsg = new StringBuilder().append("Calling a sync subUnsub request for topic: ") - .append(topic.toStringUtf8()).append(", subscriberId: ") - .append(subscriberId.toStringUtf8()).append(", operationType: ") - .append(operationType); - if (null != options) { - debugMsg.append(", createOrAttach: ").append(options.getCreateOrAttach()) - .append(", messageBound: ").append(options.getMessageBound()); - } - logger.debug(debugMsg.toString()); - } - PubSubData pubSubData = new PubSubData(topic, null, subscriberId, operationType, options, null, null); - synchronized (pubSubData) { - PubSubCallback pubSubCallback = new PubSubCallback(pubSubData); - asyncSubUnsub(topic, subscriberId, pubSubCallback, null, operationType, options); - try { - while (!pubSubData.isDone) - pubSubData.wait(); - } catch (InterruptedException e) { - throw new ServiceDownException("Interrupted Exception while waiting for async subUnsub call"); - } - // Check from the PubSubCallback if it was successful or not. - if (!pubSubCallback.getIsCallSuccessful()) { - // See what the exception was that was thrown when the operation - // failed. - PubSubException failureException = pubSubCallback.getFailureException(); - if (failureException == null) { - // This should not happen as the operation failed but a null - // PubSubException was passed. Log a warning message but - // throw a generic ServiceDownException. - logger.error("Sync SubUnsub operation failed but no PubSubException was passed!"); - throw new ServiceDownException("Server ack response to SubUnsub request is not successful"); - } - // For the expected exceptions that could occur, just rethrow - // them. - else if (failureException instanceof CouldNotConnectException) - throw (CouldNotConnectException) failureException; - else if (failureException instanceof ClientAlreadySubscribedException) - throw (ClientAlreadySubscribedException) failureException; - else if (failureException instanceof ClientNotSubscribedException) - throw (ClientNotSubscribedException) failureException; - else if (failureException instanceof ServiceDownException) - throw (ServiceDownException) failureException; - else { - logger.error("Unexpected PubSubException thrown: ", failureException); - // Throw a generic ServiceDownException but wrap the - // original PubSubException within it. - throw new ServiceDownException(failureException); - } - } - } - } - - // Private method that holds the common logic for doing asynchronous - // Subscribe or Unsubscribe requests. This is for code reuse since these two - // flows are very similar. The assumption is that the input OperationType is - // either SUBSCRIBE or UNSUBSCRIBE. - private void asyncSubUnsub(ByteString topic, ByteString subscriberId, - Callback<ResponseBody> callback, Object context, - OperationType operationType, SubscriptionOptions options) { - if (logger.isDebugEnabled()) { - StringBuilder debugMsg = new StringBuilder().append("Calling a async subUnsub request for topic: ") - .append(topic.toStringUtf8()).append(", subscriberId: ") - .append(subscriberId.toStringUtf8()).append(", operationType: ") - .append(operationType); - if (null != options) { - debugMsg.append(", createOrAttach: ").append(options.getCreateOrAttach()) - .append(", messageBound: ").append(options.getMessageBound()); - } - logger.debug(debugMsg.toString()); - } - if (OperationType.SUBSCRIBE.equals(operationType)) { - if (options.getMessageBound() <= 0 && - cfg.getSubscriptionMessageBound() > 0) { - SubscriptionOptions.Builder soBuilder = - SubscriptionOptions.newBuilder(options).setMessageBound( - cfg.getSubscriptionMessageBound()); - options = soBuilder.build(); - } - } - PubSubData pubSubData = new PubSubData(topic, null, subscriberId, operationType, - options, callback, context); - channelManager.submitOp(pubSubData); - } - - public void subscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode) - throws CouldNotConnectException, ClientAlreadySubscribedException, ServiceDownException, - InvalidSubscriberIdException { - SubscriptionOptions options = SubscriptionOptions.newBuilder().setCreateOrAttach(mode).build(); - subscribe(topic, subscriberId, options, false); - } - - public void subscribe(ByteString topic, ByteString subscriberId, SubscriptionOptions options) - throws CouldNotConnectException, ClientAlreadySubscribedException, ServiceDownException, - InvalidSubscriberIdException { - subscribe(topic, subscriberId, options, false); - } - - protected void subscribe(ByteString topic, ByteString subscriberId, SubscriptionOptions options, boolean isHub) - throws CouldNotConnectException, ClientAlreadySubscribedException, ServiceDownException, - InvalidSubscriberIdException { - // Validate that the format of the subscriberId is valid either as a - // local or hub subscriber. - if (!isValidSubscriberId(subscriberId, isHub)) { - throw new InvalidSubscriberIdException("SubscriberId passed is not valid: " + subscriberId.toStringUtf8() - + ", isHub: " + isHub); - } - try { - subUnsub(topic, subscriberId, OperationType.SUBSCRIBE, options); - } catch (ClientNotSubscribedException e) { - logger.error("Unexpected Exception thrown: ", e); - // This exception should never be thrown here. But just in case, - // throw a generic ServiceDownException but wrap the original - // Exception within it. - throw new ServiceDownException(e); - } - } - - public void asyncSubscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode, Callback<Void> callback, - Object context) { - SubscriptionOptions options = SubscriptionOptions.newBuilder().setCreateOrAttach(mode).build(); - asyncSubscribe(topic, subscriberId, options, callback, context, false); - } - - public void asyncSubscribe(ByteString topic, ByteString subscriberId, SubscriptionOptions options, - Callback<Void> callback, Object context) { - asyncSubscribe(topic, subscriberId, options, callback, context, false); - } - - protected void asyncSubscribe(ByteString topic, ByteString subscriberId, - SubscriptionOptions options, - Callback<Void> callback, Object context, boolean isHub) { - // Validate that the format of the subscriberId is valid either as a - // local or hub subscriber. - if (!isValidSubscriberId(subscriberId, isHub)) { - callback.operationFailed(context, new ServiceDownException(new InvalidSubscriberIdException( - "SubscriberId passed is not valid: " + subscriberId.toStringUtf8() + ", isHub: " + isHub))); - return; - } - asyncSubUnsub(topic, subscriberId, - new VoidCallbackAdapter<ResponseBody>(callback), context, - OperationType.SUBSCRIBE, options); - } - - public void unsubscribe(ByteString topic, ByteString subscriberId) throws CouldNotConnectException, - ClientNotSubscribedException, ServiceDownException, InvalidSubscriberIdException { - unsubscribe(topic, subscriberId, false); - } - - protected void unsubscribe(ByteString topic, ByteString subscriberId, boolean isHub) - throws CouldNotConnectException, ClientNotSubscribedException, ServiceDownException, - InvalidSubscriberIdException { - // Validate that the format of the subscriberId is valid either as a - // local or hub subscriber. - if (!isValidSubscriberId(subscriberId, isHub)) { - throw new InvalidSubscriberIdException("SubscriberId passed is not valid: " + subscriberId.toStringUtf8() - + ", isHub: " + isHub); - } - // Synchronously close the subscription on the client side. Even - // if the unsubscribe request to the server errors out, we won't be - // delivering messages for this subscription to the client. The client - // can later retry the unsubscribe request to the server so they are - // "fully" unsubscribed from the given topic. - closeSubscription(topic, subscriberId); - try { - subUnsub(topic, subscriberId, OperationType.UNSUBSCRIBE, null); - } catch (ClientAlreadySubscribedException e) { - logger.error("Unexpected Exception thrown: ", e); - // This exception should never be thrown here. But just in case, - // throw a generic ServiceDownException but wrap the original - // Exception within it. - throw new ServiceDownException(e); - } - } - - public void asyncUnsubscribe(final ByteString topic, final ByteString subscriberId, - final Callback<Void> callback, final Object context) { - doAsyncUnsubscribe(topic, subscriberId, - new VoidCallbackAdapter<ResponseBody>(callback), - context, false); - } - - protected void asyncUnsubscribe(final ByteString topic, final ByteString subscriberId, - final Callback<Void> callback, final Object context, boolean isHub) { - doAsyncUnsubscribe(topic, subscriberId, - new VoidCallbackAdapter<ResponseBody>(callback), - context, isHub); - } - - private void doAsyncUnsubscribe(final ByteString topic, final ByteString subscriberId, - final Callback<ResponseBody> callback, - final Object context, boolean isHub) { - // Validate that the format of the subscriberId is valid either as a - // local or hub subscriber. - if (!isValidSubscriberId(subscriberId, isHub)) { - callback.operationFailed(context, new ServiceDownException(new InvalidSubscriberIdException( - "SubscriberId passed is not valid: " + subscriberId.toStringUtf8() + ", isHub: " + isHub))); - return; - } - // Asynchronously close the subscription. On the callback to that - // operation once it completes, post the async unsubscribe request. - doAsyncCloseSubscription(topic, subscriberId, new Callback<ResponseBody>() { - @Override - public void operationFinished(Object ctx, ResponseBody resultOfOperation) { - asyncSubUnsub(topic, subscriberId, callback, context, OperationType.UNSUBSCRIBE, null); - } - - @Override - public void operationFailed(Object ctx, PubSubException exception) { - callback.operationFailed(context, exception); - } - }, null); - } - - // This is a helper method to determine if a subscriberId is valid as either - // a hub or local subscriber - private boolean isValidSubscriberId(ByteString subscriberId, boolean isHub) { - if ((isHub && !SubscriptionStateUtils.isHubSubscriber(subscriberId)) - || (!isHub && SubscriptionStateUtils.isHubSubscriber(subscriberId))) - return false; - else - return true; - } - - public void consume(ByteString topic, ByteString subscriberId, MessageSeqId messageSeqId) - throws ClientNotSubscribedException { - TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId); - logger.debug("Calling consume for {}, messageSeqId: {}.", - topicSubscriber, messageSeqId); - - SubscribeResponseHandler subscribeResponseHandler = - channelManager.getSubscribeResponseHandler(topicSubscriber); - // Check that this topic subscription on the client side exists. - if (null == subscribeResponseHandler || - !subscribeResponseHandler.hasSubscription(topicSubscriber)) { - throw new ClientNotSubscribedException( - "Cannot send consume message since client is not subscribed to topic: " - + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8()); - } - // Send the consume message to the server using the same subscribe - // channel that the topic subscription uses. - subscribeResponseHandler.consume(topicSubscriber, messageSeqId); - } - - public boolean hasSubscription(ByteString topic, ByteString subscriberId) throws CouldNotConnectException, - ServiceDownException { - // The subscription type of info should be stored on the server end, not - // the client side. Eventually, the server will have the Subscription - // Manager part that ties into Zookeeper to manage this info. - // Commenting out these type of API's related to that here for now until - // this data is available on the server. Will figure out what the - // correct way to contact the server to get this info is then. - // The client side just has soft memory state for client subscription - // information. - TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId); - SubscribeResponseHandler subscribeResponseHandler = - channelManager.getSubscribeResponseHandler(topicSubscriber); - return !(null == subscribeResponseHandler || - !subscribeResponseHandler.hasSubscription(topicSubscriber)); - } - - public List<ByteString> getSubscriptionList(ByteString subscriberId) throws CouldNotConnectException, - ServiceDownException { - // Same as the previous hasSubscription method, this data should reside - // on the server end, not the client side. - return null; - } - - public void startDelivery(final ByteString topic, final ByteString subscriberId, - MessageHandler messageHandler) - throws ClientNotSubscribedException, AlreadyStartDeliveryException { - TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId); - logger.debug("Starting delivery for {}.", topicSubscriber); - channelManager.startDelivery(topicSubscriber, messageHandler); - } - - public void startDeliveryWithFilter(final ByteString topic, final ByteString subscriberId, - MessageHandler messageHandler, - ClientMessageFilter messageFilter) - throws ClientNotSubscribedException, AlreadyStartDeliveryException { - if (null == messageHandler || null == messageFilter) { - throw new NullPointerException("Null message handler or message filter is provided."); - } - TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId); - messageHandler = new FilterableMessageHandler(messageHandler, messageFilter); - logger.debug("Starting delivery with filter for {}.", topicSubscriber); - channelManager.startDelivery(topicSubscriber, messageHandler); - } - - public void stopDelivery(final ByteString topic, final ByteString subscriberId) - throws ClientNotSubscribedException { - TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId); - logger.debug("Stopping delivery for {}.", topicSubscriber); - channelManager.stopDelivery(topicSubscriber); - } - - public void closeSubscription(ByteString topic, ByteString subscriberId) throws ServiceDownException { - PubSubData pubSubData = new PubSubData(topic, null, subscriberId, null, null, null, null); - synchronized (pubSubData) { - PubSubCallback pubSubCallback = new PubSubCallback(pubSubData); - doAsyncCloseSubscription(topic, subscriberId, pubSubCallback, null); - try { - while (!pubSubData.isDone) - pubSubData.wait(); - } catch (InterruptedException e) { - throw new ServiceDownException("Interrupted Exception while waiting for asyncCloseSubscription call"); - } - // Check from the PubSubCallback if it was successful or not. - if (!pubSubCallback.getIsCallSuccessful()) { - throw new ServiceDownException("Exception while trying to close the subscription for topic: " - + topic.toStringUtf8() + ", subscriberId: " + subscriberId.toStringUtf8()); - } - } - } - - public void asyncCloseSubscription(final ByteString topic, final ByteString subscriberId, - final Callback<Void> callback, final Object context) { - doAsyncCloseSubscription(topic, subscriberId, - new VoidCallbackAdapter<ResponseBody>(callback), context); - } - - private void doAsyncCloseSubscription(final ByteString topic, final ByteString subscriberId, - final Callback<ResponseBody> callback, final Object context) { - TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId); - logger.debug("Stopping delivery for {} before closing subscription.", topicSubscriber); - // We only stop delivery here not in channel manager - // Because channelManager#asyncCloseSubscription will called - // when subscription channel disconnected to clear local subscription - try { - channelManager.stopDelivery(topicSubscriber); - } catch (ClientNotSubscribedException cnse) { - // it is OK to ignore the exception when closing subscription - } - logger.debug("Closing subscription asynchronously for {}.", topicSubscriber); - channelManager.asyncCloseSubscription(topicSubscriber, callback, context); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/NetUtils.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/NetUtils.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/NetUtils.java deleted file mode 100644 index 1d4f955..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/NetUtils.java +++ /dev/null @@ -1,221 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client.netty; - -import java.net.InetSocketAddress; - -import org.jboss.netty.channel.Channel; - -import org.apache.hedwig.client.data.PubSubData; -import org.apache.hedwig.client.data.TopicSubscriber; -import org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest; -import org.apache.hedwig.protocol.PubSubProtocol.ConsumeRequest; -import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId; -import org.apache.hedwig.protocol.PubSubProtocol.OperationType; -import org.apache.hedwig.protocol.PubSubProtocol.PublishRequest; -import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest; -import org.apache.hedwig.protocol.PubSubProtocol.ProtocolVersion; -import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions; -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences; -import org.apache.hedwig.protocol.PubSubProtocol.UnsubscribeRequest; - -/** - * Utilities for network operations. - */ -public class NetUtils { - - /** - * Helper static method to get the String Hostname:Port from a netty - * Channel. Assumption is that the netty Channel was originally created with - * an InetSocketAddress. This is true with the Hedwig netty implementation. - * - * @param channel - * Netty channel to extract the hostname and port from. - * @return String representation of the Hostname:Port from the Netty Channel - */ - public static InetSocketAddress getHostFromChannel(Channel channel) { - return (InetSocketAddress) channel.getRemoteAddress(); - } - - /** - * This is a helper method to build the actual pub/sub message. - * - * @param txnId - * Transaction Id. - * @param pubSubData - * Publish call's data wrapper object. - * @return pub sub request to send - */ - public static PubSubRequest.Builder buildPubSubRequest(long txnId, - PubSubData pubSubData) { - // Create a PubSubRequest - PubSubRequest.Builder pubsubRequestBuilder = PubSubRequest.newBuilder(); - pubsubRequestBuilder.setProtocolVersion(ProtocolVersion.VERSION_ONE); - pubsubRequestBuilder.setType(pubSubData.operationType); - // for consume request, we don't need to care about tried servers list - if (OperationType.CONSUME != pubSubData.operationType) { - if (pubSubData.triedServers != null && pubSubData.triedServers.size() > 0) { - pubsubRequestBuilder.addAllTriedServers(pubSubData.triedServers); - } - } - pubsubRequestBuilder.setTxnId(txnId); - pubsubRequestBuilder.setShouldClaim(pubSubData.shouldClaim); - pubsubRequestBuilder.setTopic(pubSubData.topic); - - switch (pubSubData.operationType) { - case PUBLISH: - // Set the PublishRequest into the outer PubSubRequest - pubsubRequestBuilder.setPublishRequest(buildPublishRequest(pubSubData)); - break; - case SUBSCRIBE: - // Set the SubscribeRequest into the outer PubSubRequest - pubsubRequestBuilder.setSubscribeRequest(buildSubscribeRequest(pubSubData)); - break; - case UNSUBSCRIBE: - // Set the UnsubscribeRequest into the outer PubSubRequest - pubsubRequestBuilder.setUnsubscribeRequest(buildUnsubscribeRequest(pubSubData)); - break; - case CLOSESUBSCRIPTION: - // Set the CloseSubscriptionRequest into the outer PubSubRequest - pubsubRequestBuilder.setCloseSubscriptionRequest( - buildCloseSubscriptionRequest(pubSubData)); - break; - default: - throw new IllegalArgumentException("Unknown argument type " + pubSubData.operationType); - } - - // Update the PubSubData with the txnId and the requestWriteTime - pubSubData.txnId = txnId; - pubSubData.requestWriteTime = System.currentTimeMillis(); - - return pubsubRequestBuilder; - } - - // build publish request - private static PublishRequest.Builder buildPublishRequest(PubSubData pubSubData) { - PublishRequest.Builder publishRequestBuilder = PublishRequest.newBuilder(); - publishRequestBuilder.setMsg(pubSubData.msg); - return publishRequestBuilder; - } - - // build subscribe request - private static SubscribeRequest.Builder buildSubscribeRequest(PubSubData pubSubData) { SubscribeRequest.Builder subscribeRequestBuilder = SubscribeRequest.newBuilder(); - subscribeRequestBuilder.setSubscriberId(pubSubData.subscriberId); - subscribeRequestBuilder.setCreateOrAttach(pubSubData.options.getCreateOrAttach()); - subscribeRequestBuilder.setForceAttach(pubSubData.options.getForceAttach()); - // For now, all subscribes should wait for all cross-regional - // subscriptions to be established before returning. - subscribeRequestBuilder.setSynchronous(true); - // set subscription preferences - SubscriptionPreferences.Builder preferencesBuilder = - options2Preferences(pubSubData.options); - // backward compatable with 4.1.0 - if (preferencesBuilder.hasMessageBound()) { - subscribeRequestBuilder.setMessageBound(preferencesBuilder.getMessageBound()); - } - subscribeRequestBuilder.setPreferences(preferencesBuilder); - return subscribeRequestBuilder; - } - - // build unsubscribe request - private static UnsubscribeRequest.Builder buildUnsubscribeRequest(PubSubData pubSubData) { - // Create the UnSubscribeRequest - UnsubscribeRequest.Builder unsubscribeRequestBuilder = UnsubscribeRequest.newBuilder(); - unsubscribeRequestBuilder.setSubscriberId(pubSubData.subscriberId); - return unsubscribeRequestBuilder; - } - - // build closesubscription request - private static CloseSubscriptionRequest.Builder - buildCloseSubscriptionRequest(PubSubData pubSubData) { - // Create the CloseSubscriptionRequest - CloseSubscriptionRequest.Builder closeSubscriptionRequestBuilder = - CloseSubscriptionRequest.newBuilder(); - closeSubscriptionRequestBuilder.setSubscriberId(pubSubData.subscriberId); - return closeSubscriptionRequestBuilder; - } - - /** - * Build consume request - * - * @param txnId - * Transaction Id. - * @param topicSubscriber - * Topic Subscriber. - * @param messageSeqId - * Message Seq Id. - * @return pub/sub request. - */ - public static PubSubRequest.Builder buildConsumeRequest(long txnId, - TopicSubscriber topicSubscriber, - MessageSeqId messageSeqId) { - // Create a PubSubRequest - PubSubRequest.Builder pubsubRequestBuilder = PubSubRequest.newBuilder(); - pubsubRequestBuilder.setProtocolVersion(ProtocolVersion.VERSION_ONE); - pubsubRequestBuilder.setType(OperationType.CONSUME); - - pubsubRequestBuilder.setTxnId(txnId); - pubsubRequestBuilder.setTopic(topicSubscriber.getTopic()); - - // Create the ConsumeRequest - ConsumeRequest.Builder consumeRequestBuilder = ConsumeRequest.newBuilder(); - consumeRequestBuilder.setSubscriberId(topicSubscriber.getSubscriberId()); - consumeRequestBuilder.setMsgId(messageSeqId); - - pubsubRequestBuilder.setConsumeRequest(consumeRequestBuilder); - - return pubsubRequestBuilder; - } - - /** - * Convert client-side subscription options to subscription preferences - * - * @param options - * Client-Side subscription options - * @return subscription preferences - */ - private static SubscriptionPreferences.Builder options2Preferences(SubscriptionOptions options) { - // prepare subscription preferences - SubscriptionPreferences.Builder preferencesBuilder = - SubscriptionPreferences.newBuilder(); - - // set message bound - if (options.getMessageBound() > 0) { - preferencesBuilder.setMessageBound(options.getMessageBound()); - } - - // set message filter - if (options.hasMessageFilter()) { - preferencesBuilder.setMessageFilter(options.getMessageFilter()); - } - - // set user options - if (options.hasOptions()) { - preferencesBuilder.setOptions(options.getOptions()); - } - - // set message window size if set - if (options.hasMessageWindowSize() && options.getMessageWindowSize() > 0) { - preferencesBuilder.setMessageWindowSize(options.getMessageWindowSize()); - } - - return preferencesBuilder; - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/SubscriptionEventEmitter.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/SubscriptionEventEmitter.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/SubscriptionEventEmitter.java deleted file mode 100644 index ffe8661..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/SubscriptionEventEmitter.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client.netty; - -import java.util.concurrent.CopyOnWriteArraySet; - -import com.google.protobuf.ByteString; - -import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent; -import org.apache.hedwig.util.SubscriptionListener; - -public class SubscriptionEventEmitter { - - private final CopyOnWriteArraySet<SubscriptionListener> listeners; - - public SubscriptionEventEmitter() { - listeners = new CopyOnWriteArraySet<SubscriptionListener>(); - } - - public void addSubscriptionListener(SubscriptionListener listener) { - listeners.add(listener); - } - - public void removeSubscriptionListener(SubscriptionListener listener) { - listeners.remove(listener); - } - - public void emitSubscriptionEvent(ByteString topic, ByteString subscriberId, - SubscriptionEvent event) { - for (SubscriptionListener listener : listeners) { - listener.processEvent(topic, subscriberId, event); - } - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/java/org/apache/hedwig/client/netty/VoidCallbackAdapter.java ---------------------------------------------------------------------- diff --git a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/VoidCallbackAdapter.java b/hedwig-client/src/main/java/org/apache/hedwig/client/netty/VoidCallbackAdapter.java deleted file mode 100644 index dc2cf8b..0000000 --- a/hedwig-client/src/main/java/org/apache/hedwig/client/netty/VoidCallbackAdapter.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hedwig.client.netty; - -import org.apache.hedwig.exceptions.PubSubException; -import org.apache.hedwig.util.Callback; - -/** - * Adapts from Callback<T> to Callback<Void>. (Ignores the <T> parameter). - */ -public class VoidCallbackAdapter<T> implements Callback<T> { - private final Callback<Void> delegate; - - public VoidCallbackAdapter(Callback<Void> delegate){ - this.delegate = delegate; - } - - @Override - public void operationFinished(Object ctx, T resultOfOperation) { - delegate.operationFinished(ctx, null); - } - - @Override - public void operationFailed(Object ctx, PubSubException exception) { - delegate.operationFailed(ctx, exception); - } -}