http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/051db092/wave/src/main/java/org/waveprotocol/box/server/rpc/ServerRpcProvider.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/rpc/ServerRpcProvider.java b/wave/src/main/java/org/waveprotocol/box/server/rpc/ServerRpcProvider.java deleted file mode 100755 index b053870..0000000 --- a/wave/src/main/java/org/waveprotocol/box/server/rpc/ServerRpcProvider.java +++ /dev/null @@ -1,826 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.rpc; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import com.google.inject.Inject; -import com.google.inject.Injector; -import com.google.inject.Singleton; -import com.google.inject.servlet.GuiceFilter; -import com.google.inject.servlet.GuiceServletContextListener; -import com.google.inject.servlet.ServletModule; -import com.google.protobuf.Descriptors; -import com.google.protobuf.Descriptors.MethodDescriptor; -import com.google.protobuf.Message; -import com.google.protobuf.RpcCallback; -import com.google.protobuf.Service; -import com.typesafe.config.Config; -import org.apache.commons.lang.StringUtils; -import org.atmosphere.cache.UUIDBroadcasterCache; -import org.atmosphere.config.service.AtmosphereHandlerService; -import org.atmosphere.cpr.*; -import org.atmosphere.guice.AtmosphereGuiceServlet; -import org.atmosphere.util.IOUtils; -import org.eclipse.jetty.server.Connector; -import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.server.ServerConnector; -//import org.eclipse.jetty.server.nio.SelectChannelConnector; -import org.eclipse.jetty.server.session.HashSessionManager; -import org.eclipse.jetty.servlet.DefaultServlet; -import org.eclipse.jetty.servlet.ServletHolder; -import org.eclipse.jetty.servlets.GzipFilter; -import org.eclipse.jetty.util.resource.ResourceCollection; -import org.eclipse.jetty.util.ssl.SslContextFactory; -import org.eclipse.jetty.webapp.WebAppContext; -import org.eclipse.jetty.websocket.servlet.*; -import org.waveprotocol.box.common.comms.WaveClientRpc.ProtocolAuthenticate; -import org.waveprotocol.box.common.comms.WaveClientRpc.ProtocolAuthenticationResult; -import org.waveprotocol.box.server.authentication.SessionManager; -import org.waveprotocol.box.server.executor.ExecutorAnnotations.ClientServerExecutor; -import org.waveprotocol.box.server.persistence.file.FileUtils; -import org.waveprotocol.box.server.rpc.atmosphere.AtmosphereChannel; -import org.waveprotocol.box.server.rpc.atmosphere.AtmosphereClientInterceptor; -import org.waveprotocol.box.server.util.NetUtils; -import org.waveprotocol.box.stat.Timer; -import org.waveprotocol.box.stat.Timing; -import org.waveprotocol.wave.model.util.Pair; -import org.waveprotocol.wave.model.wave.ParticipantId; -import org.waveprotocol.wave.util.logging.Log; - -import javax.annotation.Nullable; -import javax.servlet.DispatcherType; -import javax.servlet.Filter; -import javax.servlet.ServletContextListener; -import javax.servlet.http.HttpServlet; -import javax.servlet.http.HttpSession; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executor; - -/** - * ServerRpcProvider can provide instances of type Service over an incoming - * network socket and service incoming RPCs to these services and their methods. - * - * - */ -public class ServerRpcProvider { - private static final Log LOG = Log.get(ServerRpcProvider.class); - /** - * The buffer size is passed to implementations of {@link WaveWebSocketServlet} as init - * param. It defines the response buffer size. - */ - private static final int BUFFER_SIZE = 1024 * 1024; - - private final InetSocketAddress[] httpAddresses; - private final Executor threadPool; - private final SessionManager sessionManager; - private final org.eclipse.jetty.server.SessionManager jettySessionManager; - private Server httpServer = null; - private final boolean sslEnabled; - private final String sslKeystorePath; - private final String sslKeystorePassword; - - // Mapping from incoming protocol buffer type -> specific handler. - private final Map<Descriptors.Descriptor, RegisteredServiceMethod> registeredServices = - Maps.newHashMap(); - - // List of webApp source directories ("./war", etc) - private final String[] resourceBases; - - private final String sessionStoreDir; - - /** - * Internal, static container class for any specific registered service - * method. - */ - static class RegisteredServiceMethod { - final Service service; - final MethodDescriptor method; - - RegisteredServiceMethod(Service service, MethodDescriptor method) { - this.service = service; - this.method = method; - } - } - - static class WebSocketConnection extends Connection { - private final WebSocketChannel socketChannel; - - WebSocketConnection(ParticipantId loggedInUser, ServerRpcProvider provider) { - super(loggedInUser, provider); - socketChannel = new WebSocketChannelImpl(this); - LOG.info("New websocket connection set up for user " + loggedInUser); - expectMessages(socketChannel); - } - - @Override - protected void sendMessage(int sequenceNo, Message message) { - socketChannel.sendMessage(sequenceNo, message); - } - - public WebSocketChannel getWebSocketServerChannel() { - return socketChannel; - } - } - - static class AtmosphereConnection extends Connection { - - private final AtmosphereChannel atmosphereChannel; - - public AtmosphereConnection(ParticipantId loggedInUser, ServerRpcProvider provider) { - super(loggedInUser, provider); - - atmosphereChannel = new AtmosphereChannel(this); - expectMessages(atmosphereChannel); - - } - - @Override - protected void sendMessage(int sequenceNo, Message message) { - atmosphereChannel.sendMessage(sequenceNo, message); - } - - public AtmosphereChannel getAtmosphereChannel() { - return atmosphereChannel; - } - - - } - - - - static abstract class Connection implements ProtoCallback { - private final Map<Integer, ServerRpcController> activeRpcs = - new ConcurrentHashMap<>(); - - // The logged in user. - // Note: Due to this bug: - // http://code.google.com/p/wave-protocol/issues/detail?id=119, - // the field may be null on first connect and then set later using an RPC. - private ParticipantId loggedInUser; - - private final ServerRpcProvider provider; - - /** - * @param loggedInUser The currently logged in user, or null if no user is - * logged in. - * @param provider the provider - */ - public Connection(ParticipantId loggedInUser, ServerRpcProvider provider) { - this.loggedInUser = loggedInUser; - this.provider = provider; - } - - protected void expectMessages(MessageExpectingChannel channel) { - synchronized (provider.registeredServices) { - for (RegisteredServiceMethod serviceMethod : provider.registeredServices.values()) { - channel.expectMessage(serviceMethod.service.getRequestPrototype(serviceMethod.method)); - LOG.fine("Expecting: " + serviceMethod.method.getFullName()); - } - } - channel.expectMessage(Rpc.CancelRpc.getDefaultInstance()); - } - - protected abstract void sendMessage(int sequenceNo, Message message); - - private ParticipantId authenticate(String token) { - HttpSession session = provider.sessionManager.getSessionFromToken(token); - return provider.sessionManager.getLoggedInUser(session); - } - - @Override - public void message(final int sequenceNo, Message message) { - final String messageName = "/" + message.getClass().getSimpleName(); - final Timer profilingTimer = Timing.startRequest(messageName); - if (message instanceof Rpc.CancelRpc) { - final ServerRpcController controller = activeRpcs.get(sequenceNo); - if (controller == null) { - throw new IllegalStateException("Trying to cancel an RPC that is not active!"); - } else { - LOG.info("Cancelling open RPC " + sequenceNo); - controller.cancel(); - } - } else if (message instanceof ProtocolAuthenticate) { - // Workaround for bug: http://codereview.waveprotocol.org/224001/ - - // When we get this message, either the connection will not be logged in - // (loggedInUser == null) or the connection will have been authenticated - // via cookies - // (in which case loggedInUser must match the authenticated user, and - // this message has no - // effect). - - ProtocolAuthenticate authMessage = (ProtocolAuthenticate) message; - ParticipantId authenticatedAs = authenticate(authMessage.getToken()); - - Preconditions.checkArgument(authenticatedAs != null, "Auth token invalid"); - Preconditions.checkState(loggedInUser == null || loggedInUser.equals(authenticatedAs), - "Session already authenticated as a different user"); - - loggedInUser = authenticatedAs; - LOG.info("Session authenticated as " + loggedInUser); - sendMessage(sequenceNo, ProtocolAuthenticationResult.getDefaultInstance()); - } else if (provider.registeredServices.containsKey(message.getDescriptorForType())) { - if (activeRpcs.containsKey(sequenceNo)) { - throw new IllegalStateException( - "Can't invoke a new RPC with a sequence number already in use."); - } else { - final RegisteredServiceMethod serviceMethod = - provider.registeredServices.get(message.getDescriptorForType()); - - // Create the internal ServerRpcController used to invoke the call. - final ServerRpcController controller = - new ServerRpcControllerImpl(message, serviceMethod.service, serviceMethod.method, - loggedInUser, new RpcCallback<Message>() { - @Override - synchronized public void run(Message message) { - if (message instanceof Rpc.RpcFinished - || !serviceMethod.method.getOptions().getExtension(Rpc.isStreamingRpc)) { - // This RPC is over - remove it from the map. - boolean failed = message instanceof Rpc.RpcFinished && ((Rpc.RpcFinished) message).getFailed(); - LOG.fine("RPC " + sequenceNo + " is now finished, failed = " + failed); - if (failed) { - LOG.info("error = " + ((Rpc.RpcFinished) message).getErrorText()); - } - activeRpcs.remove(sequenceNo); - } - sendMessage(sequenceNo, message); - if (profilingTimer != null) { - Timing.stop(profilingTimer); - } - } - }); - - // Kick off a new thread specific to this RPC. - activeRpcs.put(sequenceNo, controller); - provider.threadPool.execute(controller); - } - } else { - // Sent a message type we understand, but don't expect - erronous case! - throw new IllegalStateException( - "Got expected but unknown message (" + message + ") for sequence: " + sequenceNo); - } - } - } - - /** - * Construct a new ServerRpcProvider, hosting on the specified - * WebSocket addresses. - * - * Also accepts an ExecutorService for spawning managing threads. - */ - public ServerRpcProvider(InetSocketAddress[] httpAddresses, - String[] resourceBases, Executor threadPool, SessionManager sessionManager, - org.eclipse.jetty.server.SessionManager jettySessionManager, String sessionStoreDir, - boolean sslEnabled, String sslKeystorePath, String sslKeystorePassword) { - this.httpAddresses = httpAddresses; - this.resourceBases = resourceBases; - this.threadPool = threadPool; - this.sessionManager = sessionManager; - this.jettySessionManager = jettySessionManager; - this.sessionStoreDir = sessionStoreDir; - this.sslEnabled = sslEnabled; - this.sslKeystorePath = sslKeystorePath; - this.sslKeystorePassword = sslKeystorePassword; - } - - /** - * Constructs a new ServerRpcProvider with a default ExecutorService. - */ - public ServerRpcProvider(InetSocketAddress[] httpAddresses, - String[] resourceBases, SessionManager sessionManager, - org.eclipse.jetty.server.SessionManager jettySessionManager, String sessionStoreDir, - boolean sslEnabled, String sslKeystorePath, String sslKeystorePassword, - Executor executor) { - this(httpAddresses, resourceBases, executor, - sessionManager, jettySessionManager, sessionStoreDir, sslEnabled, sslKeystorePath, - sslKeystorePassword); - } - - @Inject - public ServerRpcProvider(Config config, - SessionManager sessionManager, org.eclipse.jetty.server.SessionManager jettySessionManager, - @ClientServerExecutor Executor executorService) { - this(parseAddressList(config.getStringList("core.http_frontend_addresses"), - config.getString("core.http_websocket_public_address")), - config.getStringList("core.resource_bases").toArray(new String[0]), - sessionManager, - jettySessionManager, - config.getString("core.sessions_store_directory"), - config.getBoolean("security.enable_ssl"), - config.getString("security.ssl_keystore_path"), - config.getString("security.ssl_keystore_password"), - executorService); - } - - public void startWebSocketServer(final Injector injector) { - httpServer = new Server(); - - List<Connector> connectors = getSelectChannelConnectors(httpAddresses); - if (connectors.isEmpty()) { - LOG.severe("No valid http end point address provided!"); - } - for (Connector connector : connectors) { - httpServer.addConnector(connector); - } - final WebAppContext context = new WebAppContext(); - - context.setParentLoaderPriority(true); - - if (jettySessionManager != null) { - // This disables JSessionIDs in URLs redirects - // see: http://stackoverflow.com/questions/7727534/how-do-you-disable-jsessionid-for-jetty-running-with-the-eclipse-jetty-maven-plu - // and: http://jira.codehaus.org/browse/JETTY-467?focusedCommentId=114884&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-114884 - jettySessionManager.setSessionIdPathParameterName(null); - - context.getSessionHandler().setSessionManager(jettySessionManager); - } - final ResourceCollection resources = new ResourceCollection(resourceBases); - context.setBaseResource(resources); - - addWebSocketServlets(); - - try { - - final ServletModule servletModule = getServletModule(); - - ServletContextListener contextListener = new GuiceServletContextListener() { - - private final Injector childInjector = injector.createChildInjector(servletModule); - - @Override - protected Injector getInjector() { - return childInjector; - } - }; - - context.addEventListener(contextListener); - context.addFilter(GuiceFilter.class, "/*", EnumSet.allOf(DispatcherType.class)); - context.addFilter(GzipFilter.class, "/webclient/*", EnumSet.allOf(DispatcherType.class)); - httpServer.setHandler(context); - - httpServer.start(); - restoreSessions(); - - } catch (Exception e) { // yes, .start() throws "Exception" - LOG.severe("Fatal error starting http server.", e); - return; - } - LOG.fine("WebSocket server running."); - } - - private void restoreSessions() { - try { - HashSessionManager hashSessionManager = (HashSessionManager) jettySessionManager; - hashSessionManager.setStoreDirectory(FileUtils.createDirIfNotExists(sessionStoreDir, - "Session persistence")); - hashSessionManager.setSavePeriod(60); - hashSessionManager.restoreSessions(); - } catch (Exception e) { - LOG.warning("Cannot restore sessions"); - } - } - public void addWebSocketServlets() { - // Servlet where the websocket connection is served from. - ServletHolder wsholder = addServlet("/socket", WaveWebSocketServlet.class); - // TODO(zamfi): fix to let messages span frames. - wsholder.setInitParameter("bufferSize", "" + BUFFER_SIZE); - - // Atmosphere framework. Replacement of Socket.IO - // See https://issues.apache.org/jira/browse/WAVE-405 - ServletHolder atholder = addServlet("/atmosphere*", AtmosphereGuiceServlet.class); - // Enable guice. See - // https://github.com/Atmosphere/atmosphere/wiki/Configuring-Atmosphere%27s-Classes-Creation-and-Injection - atholder.setInitParameter("org.atmosphere.cpr.objectFactory", - "org.waveprotocol.box.server.rpc.atmosphere.GuiceAtmosphereFactory"); - atholder.setAsyncSupported(true); - atholder.setInitOrder(0); - - // Serve the static content and GWT web client with the default servlet - // (acts like a standard file-based web server). - addServlet("/static/*", DefaultServlet.class); - addServlet("/webclient/*", DefaultServlet.class); - } - - public ServletModule getServletModule() { - - return new ServletModule() { - @Override - protected void configureServlets() { - // We add servlets here to override the DefaultServlet automatic registered by WebAppContext - // in path "/" with our WaveClientServlet. Any other way to do this? - // Related question (unanswered 08-Apr-2011) - // http://web.archiveorange.com/archive/v/d0LdlXf1kN0OXyPNyQZp - for (Pair<String, ServletHolder> servlet : servletRegistry) { - String url = servlet.getFirst(); - @SuppressWarnings("unchecked") - Class<HttpServlet> clazz = (Class<HttpServlet>) servlet.getSecond().getHeldClass(); - Map<String,String> params = servlet.getSecond().getInitParameters(); - serve(url).with(clazz,params); - bind(clazz).in(Singleton.class); - } - for (Pair<String, Class<? extends Filter>> filter : filterRegistry) { - filter(filter.first).through(filter.second); - } - } - }; - } - - private static InetSocketAddress[] parseAddressList(List<String> addressList, String websocketAddress) { - if (addressList == null || addressList.size() == 0) { - return new InetSocketAddress[0]; - } else { - Set<InetSocketAddress> addresses = Sets.newHashSet(); - // We add the websocketAddress as another listening address. - ArrayList<String> mergedAddressList = new ArrayList<>(addressList); - if (!StringUtils.isEmpty(websocketAddress)) { - mergedAddressList.add(websocketAddress); - } - for (String str : mergedAddressList) { - if (str.length() == 0) { - LOG.warning("Encountered empty address in http addresses list."); - } else { - try { - InetSocketAddress address = NetUtils.parseHttpAddress(str); - if (!addresses.contains(address)) { - addresses.add(address); - } else { - LOG.warning( - "Ignoring duplicate address in http addresses list: Duplicate entry '" + str - + "' resolved to " + address.getAddress().getHostAddress()); - } - } catch (IOException e) { - LOG.severe("Unable to process address " + str, e); - } - } - } - return addresses.toArray(new InetSocketAddress[addresses.size()]); - } - } - - /** - * @return a list of {@link SelectChannelConnector} each bound to a host:port - * pair form the list addresses. - */ - private List<Connector> getSelectChannelConnectors( - InetSocketAddress[] httpAddresses) { - List<Connector> list = Lists.newArrayList(); - String[] excludeCiphers = {"SSL_RSA_EXPORT_WITH_RC4_40_MD5", "SSL_RSA_EXPORT_WITH_DES40_CBC_SHA", - "SSL_DHE_RSA_EXPORT_WITH_DES40_CBC_SHA", "SSL_RSA_WITH_DES_CBC_SHA", - "SSL_DHE_RSA_WITH_DES_CBC_SHA", "TLS_DHE_RSA_WITH_AES_128_CBC_SHA", - "SSL_DHE_RSA_WITH_3DES_EDE_CBC_SHA", "TLS_DHE_RSA_WITH_AES_256_CBC_SHA"}; - SslContextFactory sslContextFactory = null; - - if (sslEnabled) { - Preconditions.checkState(sslKeystorePath != null && !sslKeystorePath.isEmpty(), - "SSL Keystore path left blank"); - Preconditions.checkState(sslKeystorePassword != null && !sslKeystorePassword.isEmpty(), - "SSL Keystore password left blank"); - - sslContextFactory = new SslContextFactory(sslKeystorePath); - sslContextFactory.setKeyStorePassword(sslKeystorePassword); - sslContextFactory.setRenegotiationAllowed(false); - sslContextFactory.setExcludeCipherSuites(excludeCiphers); - - // Note: we only actually needed client auth for AuthenticationServlet. - // Using Need instead of Want prevents web-sockets from working on - // Chrome. - sslContextFactory.setWantClientAuth(true); - } - - for (InetSocketAddress address : httpAddresses) { - ServerConnector connector; - if (sslEnabled) { - connector = new ServerConnector(httpServer, sslContextFactory); - } else { - connector = new ServerConnector(httpServer); - } - connector.setHost(address.getAddress().getHostAddress()); - connector.setPort(address.getPort()); - connector.setIdleTimeout(0); - list.add(connector); - } - - return list; - } - - @SuppressWarnings("serial") - @Singleton - public static class WaveWebSocketServlet extends WebSocketServlet { - - final ServerRpcProvider provider; - final int websocketMaxIdleTime; - final int websocketMaxMessageSize; - - @Inject - public WaveWebSocketServlet(ServerRpcProvider provider, Config config) { - super(); - this.provider = provider; - this.websocketMaxIdleTime = config.getInt("network.websocket_max_idle_time"); - this.websocketMaxMessageSize = config.getInt("network.websocket_max_message_size"); - } - - @SuppressWarnings("cast") - @Override - public void configure(WebSocketServletFactory factory) { - if (websocketMaxIdleTime == 0) { - // Jetty does not allow to set infinite timeout. - factory.getPolicy().setIdleTimeout(Integer.MAX_VALUE); - } else { - factory.getPolicy().setIdleTimeout(websocketMaxIdleTime); - } - factory.getPolicy().setMaxTextMessageSize(websocketMaxMessageSize * 1024 * 1024); - factory.setCreator(new WebSocketCreator() { - @Override - public Object createWebSocket(ServletUpgradeRequest req, ServletUpgradeResponse resp) { - ParticipantId loggedInUser = - provider.sessionManager.getLoggedInUser(req.getSession()); - - return new WebSocketConnection(loggedInUser, provider).getWebSocketServerChannel(); - } - }); - } - } - - /** - * Manange atmosphere connections and dispatch messages to - * wave channels. - * - * @author pablo...@gmail.com <Pablo Ojanguren> - * - */ - @Singleton - @AtmosphereHandlerService(path = "/atmosphere", - interceptors = {AtmosphereClientInterceptor.class}, - broadcasterCache = UUIDBroadcasterCache.class) - public static class WaveAtmosphereService implements AtmosphereHandler { - - - private static final Log LOG = Log.get(WaveAtmosphereService.class); - - private static final String WAVE_CHANNEL_ATTRIBUTE = "WAVE_CHANNEL_ATTRIBUTE"; - private static final String MSG_SEPARATOR = "|"; - private static final String MSG_CHARSET = "UTF-8"; - - @Inject - public ServerRpcProvider provider; - - - @Override - public void onRequest(AtmosphereResource resource) throws IOException { - - AtmosphereResourceSession resourceSession = - AtmosphereResourceSessionFactory.getDefault().getSession(resource); - - AtmosphereChannel resourceChannel = - resourceSession.getAttribute(WAVE_CHANNEL_ATTRIBUTE, AtmosphereChannel.class); - - if (resourceChannel == null) { - - ParticipantId loggedInUser = - provider.sessionManager.getLoggedInUser(resource.getRequest().getSession(false)); - - AtmosphereConnection connection = new AtmosphereConnection(loggedInUser, provider); - resourceChannel = connection.getAtmosphereChannel(); - resourceSession.setAttribute(WAVE_CHANNEL_ATTRIBUTE, resourceChannel); - resourceChannel.onConnect(resource); - } - - resource.setBroadcaster(resourceChannel.getBroadcaster()); // on every - // request - - if (resource.getRequest().getMethod().equalsIgnoreCase("GET")) { - - resource.suspend(); - - } - - - if (resource.getRequest().getMethod().equalsIgnoreCase("POST")) { - - StringBuilder b = IOUtils.readEntirely(resource); - resourceChannel.onMessage(b.toString()); - - } - - } - - - @Override - public void onStateChange(AtmosphereResourceEvent event) throws IOException { - - - AtmosphereResponse response = event.getResource().getResponse(); - AtmosphereResource resource = event.getResource(); - - if (event.isSuspended()) { - - // Set content type before do response.getWriter() - // http://docs.oracle.com/javaee/5/api/javax/servlet/ServletResponse.html#setContentType(java.lang.String) - response.setContentType("text/plain; charset=UTF-8"); - response.setCharacterEncoding("UTF-8"); - - - if (event.getMessage().getClass().isArray()) { - - LOG.fine("SEND MESSAGE ARRAY " + event.getMessage().toString()); - - List<Object> list = Collections.singletonList(event.getMessage()); - - response.getOutputStream().write(MSG_SEPARATOR.getBytes(MSG_CHARSET)); - for (Object object : list) { - String message = (String) object; - message += MSG_SEPARATOR; - response.getOutputStream().write(message.getBytes(MSG_CHARSET)); - } - - } else if (event.getMessage() instanceof List) { - - LOG.fine("SEND MESSAGE LIST " + event.getMessage().toString()); - - @SuppressWarnings("unchecked") - List<Object> list = List.class.cast(event.getMessage()); - - response.getOutputStream().write(MSG_SEPARATOR.getBytes(MSG_CHARSET)); - for (Object object : list) { - String message = (String) object; - message += MSG_SEPARATOR; - response.getOutputStream().write(message.getBytes(MSG_CHARSET)); - } - - } else if (event.getMessage() instanceof String) { - - LOG.fine("SEND MESSAGE " + event.getMessage().toString()); - - String message = (String) event.getMessage(); - response.getOutputStream().write(message.getBytes(MSG_CHARSET)); - } - - - - try { - - response.flushBuffer(); - - switch (resource.transport()) { - case JSONP: - case LONG_POLLING: - event.getResource().resume(); - break; - case WEBSOCKET: - case STREAMING: - case SSE: - response.getOutputStream().flush(); - break; - default: - LOG.info("Unknown transport"); - break; - } - } catch (IOException e) { - LOG.info("Error resuming resource response", e); - } - - - } else if (event.isResuming()) { - - LOG.fine("RESUMING"); - - } else if (event.isResumedOnTimeout()) { - - LOG.fine("RESUMED ON TIMEOUT"); - - } else if (event.isClosedByApplication() || event.isClosedByClient()) { - - LOG.fine("CONNECTION CLOSED"); - - AtmosphereResourceSession resourceSession = - AtmosphereResourceSessionFactory.getDefault().getSession(resource); - - AtmosphereChannel resourceChannel = - resourceSession.getAttribute(WAVE_CHANNEL_ATTRIBUTE, AtmosphereChannel.class); - - if (resourceChannel != null) { - resourceChannel.onDisconnect(); - } - } - } - - @Override - public void destroy() { - // Nothing to do - - } - - - } - - /** - * Returns the socket the WebSocket server is listening on. - */ - public SocketAddress getWebSocketAddress() { - if (httpServer == null) { - return null; - } else { - ServerConnector c = (ServerConnector)httpServer.getConnectors()[0]; - return new InetSocketAddress(c.getHost(), c.getLocalPort()); - } - } - - /** - * Stops this server. - */ - public void stopServer() throws IOException { - try { - httpServer.stop(); // yes, .stop() throws "Exception" - } catch (Exception e) { - LOG.warning("Fatal error stopping http server.", e); - } - LOG.fine("server shutdown."); - } - - /** - * Register all methods provided by the given service type. - */ - public void registerService(Service service) { - synchronized (registeredServices) { - for (MethodDescriptor methodDescriptor : service.getDescriptorForType().getMethods()) { - registeredServices.put(methodDescriptor.getInputType(), - new RegisteredServiceMethod(service, methodDescriptor)); - } - } - } - - /** - * List of servlets - */ - List<Pair<String, ServletHolder>> servletRegistry = Lists.newArrayList(); - - /** - * List of filters - */ - List<Pair<String, Class<? extends Filter>>> filterRegistry = Lists.newArrayList(); - - /** - * Add a servlet to the servlet registry. This servlet will be attached to the - * specified URL pattern when the server is started up. - * - * @param urlPattern the URL pattern for paths. Eg, '/foo', '/foo/*'. - * @param servlet the servlet class to bind to the specified paths. - * @param initParams the map with init params, can be null or empty. - * @return the {@link ServletHolder} that holds the servlet. - */ - public ServletHolder addServlet(String urlPattern, Class<? extends HttpServlet> servlet, - @Nullable Map<String, String> initParams) { - ServletHolder servletHolder = new ServletHolder(servlet); - if (initParams != null) { - servletHolder.setInitParameters(initParams); - } - servletRegistry.add(Pair.of(urlPattern, servletHolder)); - return servletHolder; - } - - /** - * Add a servlet to the servlet registry. This servlet will be attached to the - * specified URL pattern when the server is started up. - * @param urlPattern the URL pattern for paths. Eg, '/foo', '/foo/*'. - * @param servlet the servlet class to bind to the specified paths. - * @return the {@link ServletHolder} that holds the servlet. - */ - public ServletHolder addServlet(String urlPattern, Class<? extends HttpServlet> servlet) { - return addServlet(urlPattern, servlet, null); - } - - /** - * Add a filter to the filter registry. This filter will be attached to the - * specified URL pattern when the server is started up. - * - * @param urlPattern the URL pattern for paths. Eg, '/foo', '/foo/*'. - * - */ - public void addFilter(String urlPattern, Class<? extends Filter> filter) { - filterRegistry.add(new Pair<String, Class<? extends Filter>>(urlPattern, filter)); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/051db092/wave/src/main/java/org/waveprotocol/box/server/rpc/SignOutServlet.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/rpc/SignOutServlet.java b/wave/src/main/java/org/waveprotocol/box/server/rpc/SignOutServlet.java deleted file mode 100644 index 4a08c39..0000000 --- a/wave/src/main/java/org/waveprotocol/box/server/rpc/SignOutServlet.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.rpc; - -import com.google.common.base.Preconditions; -import com.google.inject.Inject; - -import org.waveprotocol.box.server.authentication.SessionManager; - -import java.io.IOException; - -import javax.inject.Singleton; -import javax.servlet.http.HttpServlet; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; -import javax.servlet.http.HttpSession; - -/** - * A servlet for signing out the user. - * - * @author jose...@gmail.com (Joseph Gentle) - */ -@SuppressWarnings("serial") -@Singleton -public class SignOutServlet extends HttpServlet { - private final SessionManager sessionManager; - - @Inject - public SignOutServlet(SessionManager sessionManager) { - Preconditions.checkNotNull(sessionManager, "Session manager is null"); - this.sessionManager = sessionManager; - } - - /** - * On GET, sign the user out and redirect them to the redirect URL. - */ - @Override - protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOException { - HttpSession session = req.getSession(false); - sessionManager.logout(session); - - String redirectUrl = req.getParameter("r"); - if (redirectUrl != null && redirectUrl.startsWith("/")) { - resp.sendRedirect(redirectUrl); - } else { - resp.setStatus(HttpServletResponse.SC_OK); - resp.setContentType("text/html"); - resp.getWriter().print("<html><body>Logged out.</body></html>"); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/051db092/wave/src/main/java/org/waveprotocol/box/server/rpc/UserRegistrationServlet.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/rpc/UserRegistrationServlet.java b/wave/src/main/java/org/waveprotocol/box/server/rpc/UserRegistrationServlet.java deleted file mode 100644 index d973812..0000000 --- a/wave/src/main/java/org/waveprotocol/box/server/rpc/UserRegistrationServlet.java +++ /dev/null @@ -1,136 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.rpc; - -import com.google.gxp.base.GxpContext; -import com.google.inject.Inject; -import com.google.inject.name.Named; -import com.typesafe.config.Config; -import org.waveprotocol.box.server.CoreSettingsNames; -import org.waveprotocol.box.server.authentication.HttpRequestBasedCallbackHandler; -import org.waveprotocol.box.server.authentication.PasswordDigest; -import org.waveprotocol.box.server.gxp.UserRegistrationPage; -import org.waveprotocol.box.server.persistence.AccountStore; -import org.waveprotocol.box.server.robots.agent.welcome.WelcomeRobot; -import org.waveprotocol.box.server.util.RegistrationUtil; -import org.waveprotocol.wave.model.wave.InvalidParticipantAddress; -import org.waveprotocol.wave.model.wave.ParticipantId; - -import javax.inject.Singleton; -import javax.servlet.http.HttpServlet; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; -import java.io.IOException; -import java.util.Locale; - -/** - * The user registration servlet allows new users to register accounts. - * - * @author jose...@gmail.com (Joseph Gentle) - */ -@SuppressWarnings("serial") -@Singleton -public final class UserRegistrationServlet extends HttpServlet { - - private final AccountStore accountStore; - private final String domain; - private final WelcomeRobot welcomeBot; - private final boolean registrationDisabled; - private final String analyticsAccount; - - @Inject - public UserRegistrationServlet( - AccountStore accountStore, - @Named(CoreSettingsNames.WAVE_SERVER_DOMAIN) String domain, - Config config, - WelcomeRobot welcomeBot) { - - this.accountStore = accountStore; - this.domain = domain; - this.welcomeBot = welcomeBot; - this.registrationDisabled = config.getBoolean("administration.disable_registration"); - this.analyticsAccount = config.getString("administration.analytics_account"); - } - - @Override - protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOException { - writeRegistrationPage("", AuthenticationServlet.RESPONSE_STATUS_NONE, req.getLocale(), resp); - } - - @Override - protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOException { - req.setCharacterEncoding("UTF-8"); - - String message = null; - String responseType; - if (!registrationDisabled) { - message = tryCreateUser(req.getParameter(HttpRequestBasedCallbackHandler.ADDRESS_FIELD), - req.getParameter(HttpRequestBasedCallbackHandler.PASSWORD_FIELD)); - } - - if (message != null || registrationDisabled) { - resp.setStatus(HttpServletResponse.SC_FORBIDDEN); - responseType = AuthenticationServlet.RESPONSE_STATUS_FAILED; - } else { - message = "Registration complete."; - resp.setStatus(HttpServletResponse.SC_OK); - responseType = AuthenticationServlet.RESPONSE_STATUS_SUCCESS; - } - - writeRegistrationPage(message, responseType, req.getLocale(), resp); - } - - /** - * Try to create a user with the provided username and password. On error, - * returns a string containing an error message. On success, returns null. - */ - private String tryCreateUser(String username, String password) { - ParticipantId id; - try { - id = RegistrationUtil.checkNewUsername(domain, username); - } catch (InvalidParticipantAddress exception) { - return exception.getMessage(); - } - - if(RegistrationUtil.doesAccountExist(accountStore, id)) { - return "Account already exists"; - } - - if (password == null) { - // Register the user with an empty password. - password = ""; - } - - if (!RegistrationUtil.createAccountIfMissing(accountStore, id, - new PasswordDigest(password.toCharArray()), welcomeBot)) { - return "An unexpected error occurred while trying to create the account"; - } - - return null; - } - - private void writeRegistrationPage(String message, String responseType, Locale locale, - HttpServletResponse dest) throws IOException { - dest.setCharacterEncoding("UTF-8"); - dest.setContentType("text/html;charset=utf-8"); - UserRegistrationPage.write(dest.getWriter(), new GxpContext(locale), domain, message, - responseType, registrationDisabled, analyticsAccount); - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/051db092/wave/src/main/java/org/waveprotocol/box/server/rpc/WaveClientServlet.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/rpc/WaveClientServlet.java b/wave/src/main/java/org/waveprotocol/box/server/rpc/WaveClientServlet.java deleted file mode 100644 index 00f635a..0000000 --- a/wave/src/main/java/org/waveprotocol/box/server/rpc/WaveClientServlet.java +++ /dev/null @@ -1,205 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.rpc; - -import com.google.common.collect.Maps; -import com.google.gxp.base.GxpContext; -import com.google.inject.Inject; -import com.google.inject.name.Named; -import com.typesafe.config.Config; -import org.apache.commons.lang.StringUtils; -import org.json.JSONException; -import org.json.JSONObject; -import org.waveprotocol.box.common.SessionConstants; -import org.waveprotocol.box.server.CoreSettingsNames; -import org.waveprotocol.box.server.account.AccountData; -import org.waveprotocol.box.server.authentication.SessionManager; -import org.waveprotocol.box.server.gxp.TopBar; -import org.waveprotocol.box.server.gxp.WaveClientPage; -import org.waveprotocol.box.server.util.RandomBase64Generator; -import org.waveprotocol.box.server.util.UrlParameters; -import org.waveprotocol.wave.client.util.ClientFlagsBase; -import org.waveprotocol.wave.common.bootstrap.FlagConstants; -import org.waveprotocol.wave.model.wave.ParticipantId; -import org.waveprotocol.wave.util.logging.Log; - -import javax.inject.Singleton; -import javax.servlet.http.HttpServlet; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; -import javax.servlet.http.HttpSession; -import java.io.IOException; -import java.lang.reflect.Method; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.List; - -/** - * The HTTP servlet for serving a wave client along with content generated on - * the server. - * - * @author kal...@google.com (Benjamin Kalman) - */ -@SuppressWarnings("serial") -@Singleton -public class WaveClientServlet extends HttpServlet { - - private static final Log LOG = Log.get(WaveClientServlet.class); - - private static final HashMap<String, String> FLAG_MAP = Maps.newHashMap(); - static { - // __NAME_MAPPING__ is a map of name to obfuscated id - for (int i = 0; i < FlagConstants.__NAME_MAPPING__.length; i += 2) { - FLAG_MAP.put(FlagConstants.__NAME_MAPPING__[i], FlagConstants.__NAME_MAPPING__[i + 1]); - } - } - - private final String domain; - private final String analyticsAccount; - private final SessionManager sessionManager; - private final String websocketPresentedAddress; - - /** - * Creates a servlet for the wave client. - */ - @Inject - public WaveClientServlet( - @Named(CoreSettingsNames.WAVE_SERVER_DOMAIN) String domain, - Config config, - SessionManager sessionManager) { - List<String> httpAddresses = config.getStringList("core.http_frontend_addresses"); - String websocketAddress = config.getString("core.http_websocket_public_address"); - String websocketPresentedAddress = config.getString("core.http_websocket_presented_address"); - this.domain = domain; - String websocketAddress1 = StringUtils.isEmpty(websocketAddress) ? httpAddresses.get(0) : websocketAddress; - this.websocketPresentedAddress = StringUtils.isEmpty(websocketPresentedAddress) ? - websocketAddress1 : websocketPresentedAddress; - this.analyticsAccount = config.getString("administration.analytics_account"); - this.sessionManager = sessionManager; - } - - @Override - protected void doGet(HttpServletRequest request, HttpServletResponse response) - throws IOException { - ParticipantId id = sessionManager.getLoggedInUser(request.getSession(false)); - - // Eventually, it would be nice to show users who aren't logged in the public waves. - // However, public waves aren't implemented yet. For now, we'll just redirect users - // who haven't signed in to the sign in page. - if (id == null) { - response.sendRedirect(sessionManager.getLoginUrl("/")); - return; - } - - AccountData account = sessionManager.getLoggedInAccount(request.getSession(false)); - if (account != null) { - String locale = account.asHuman().getLocale(); - if (locale != null) { - String requestLocale = UrlParameters.getParameters(request.getQueryString()).get("locale"); - if (requestLocale == null) { - response.sendRedirect(UrlParameters.addParameter(request.getRequestURL().toString(), "locale", locale)); - return; - } - } - } - - String[] parts = id.getAddress().split("@"); - String username = parts[0]; - String userDomain = id.getDomain(); - - try { - WaveClientPage.write(response.getWriter(), new GxpContext(request.getLocale()), - getSessionJson(request.getSession(false)), getClientFlags(request), websocketPresentedAddress, - TopBar.getGxpClosure(username, userDomain), analyticsAccount); - } catch (IOException e) { - LOG.warning("Failed to write GXP for request " + request, e); - response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); - return; - } - - response.setContentType("text/html"); - response.setStatus(HttpServletResponse.SC_OK); - } - - private JSONObject getClientFlags(HttpServletRequest request) { - try { - JSONObject ret = new JSONObject(); - - Enumeration<?> iter = request.getParameterNames(); - while (iter.hasMoreElements()) { - String name = (String) iter.nextElement(); - String value = request.getParameter(name); - - if (FLAG_MAP.containsKey(name)) { - // Set using the correct type of data in the json using reflection - try { - Method getter = ClientFlagsBase.class.getMethod(name); - Class<?> retType = getter.getReturnType(); - - if (retType.equals(String.class)) { - ret.put(FLAG_MAP.get(name), value); - } else if (retType.equals(Integer.class)) { - ret.put(FLAG_MAP.get(name), Integer.parseInt(value)); - } else if (retType.equals(Boolean.class)) { - ret.put(FLAG_MAP.get(name), Boolean.parseBoolean(value)); - } else if (retType.equals(Float.class)) { - ret.put(FLAG_MAP.get(name), Float.parseFloat(value)); - } else if (retType.equals(Double.class)) { - ret.put(FLAG_MAP.get(name), Double.parseDouble(value)); - } else { - // Flag exists, but its type is unknown, so it can not be - // properly encoded in JSON. - LOG.warning("Ignoring flag [" + name + "] with unknown return type: " + retType); - } - - // Ignore the flag on any exception - } catch (SecurityException | NumberFormatException ignored) { - } catch (NoSuchMethodException ex) { - LOG.warning("Failed to find the flag [" + name + "] in ClientFlagsBase."); - } - } - } - - return ret; - } catch (JSONException ex) { - LOG.severe("Failed to create flags JSON"); - return new JSONObject(); - } - } - - private JSONObject getSessionJson(HttpSession session) { - try { - ParticipantId user = sessionManager.getLoggedInUser(session); - String address = (user != null) ? user.getAddress() : null; - - // TODO(zdwang): Figure out a proper session id rather than generating a - // random number - String sessionId = (new RandomBase64Generator()).next(10); - - return new JSONObject() - .put(SessionConstants.DOMAIN, domain) - .putOpt(SessionConstants.ADDRESS, address) - .putOpt(SessionConstants.ID_SEED, sessionId); - } catch (JSONException e) { - LOG.severe("Failed to create session JSON"); - return new JSONObject(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/051db092/wave/src/main/java/org/waveprotocol/box/server/rpc/WaveRefServlet.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/rpc/WaveRefServlet.java b/wave/src/main/java/org/waveprotocol/box/server/rpc/WaveRefServlet.java deleted file mode 100644 index fdbc895..0000000 --- a/wave/src/main/java/org/waveprotocol/box/server/rpc/WaveRefServlet.java +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.rpc; - -import com.google.inject.Inject; -import com.google.inject.Singleton; - -import org.waveprotocol.box.server.authentication.SessionManager; -import org.waveprotocol.wave.model.wave.ParticipantId; - -import java.io.IOException; - -import javax.servlet.ServletException; -import javax.servlet.http.HttpServlet; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; - -/** - * @author yur...@apache.org (Yuri Zelikov) - */ -@SuppressWarnings("serial") -@Singleton -public class WaveRefServlet extends HttpServlet { - - private final SessionManager sessionManager; - - @Inject - public WaveRefServlet(SessionManager sessionManager) { - this.sessionManager = sessionManager; - } - - @Override - protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, - IOException { - ParticipantId user = sessionManager.getLoggedInUser(req.getSession(false)); - String path = req.getRequestURI().replace("/waveref/", ""); - if (user != null) { - resp.sendRedirect("/#" + path); - } else { - resp.sendRedirect("/auth/signin?r=/#" + path); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/051db092/wave/src/main/java/org/waveprotocol/box/server/rpc/WebSocketChannel.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/rpc/WebSocketChannel.java b/wave/src/main/java/org/waveprotocol/box/server/rpc/WebSocketChannel.java deleted file mode 100644 index 619b84e..0000000 --- a/wave/src/main/java/org/waveprotocol/box/server/rpc/WebSocketChannel.java +++ /dev/null @@ -1,175 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.rpc; - -import com.google.gson.Gson; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; -import com.google.gson.JsonPrimitive; -import com.google.protobuf.Message; - -import org.waveprotocol.box.server.rpc.ProtoSerializer.SerializationException; -import org.waveprotocol.wave.communication.gson.GsonException; -import org.waveprotocol.wave.communication.gson.GsonSerializable; -import org.waveprotocol.wave.util.logging.Log; - -import java.io.IOException; -import org.waveprotocol.box.stat.SessionContext; -import org.waveprotocol.box.stat.Timer; -import org.waveprotocol.box.stat.Timing; - -/** - * A channel abstraction for websocket, for sending and receiving strings. - */ -public abstract class WebSocketChannel extends MessageExpectingChannel { - private static final Log LOG = Log.get(WebSocketChannel.class); - - /** - * Envelope for delivering arbitrary messages. Each envelope has a sequence - * number and a message. - * <p> - * Note that this message can not be described by a protobuf, because it - * contains an arbitrary protobuf, which breaks the protobuf typing rules. - */ - private static class MessageWrapper { - private final static JsonParser parser = new JsonParser(); - - final int sequenceNumber; - final String messageType; - final JsonElement message; - - public MessageWrapper(int sequenceNumber, String messageType, JsonElement message) { - this.sequenceNumber = sequenceNumber; - this.messageType = messageType; - this.message = message; - } - - public static MessageWrapper deserialize(Gson gson, String data) { - JsonElement e = parser.parse(data); - JsonObject obj = e.getAsJsonObject(); - String type = obj.get("messageType").getAsString(); - int seqno = obj.get("sequenceNumber").getAsInt(); - JsonElement message = obj.get("message"); - return new MessageWrapper(seqno, type, message); - } - - public static String serialize(String type,int seqno, JsonElement message) { - JsonObject o = new JsonObject(); - o.add("messageType", new JsonPrimitive(type)); - o.add("sequenceNumber", new JsonPrimitive(seqno)); - o.add("message", message); - return o.toString(); - } - } - - private final ProtoCallback callback; - private final SessionContext sessionContext; - private final Gson gson = new Gson(); - private final ProtoSerializer serializer; - - /** - * Constructs a new WebSocketChannel, using the callback to handle any - * incoming messages. - * - * @param callback a protocallback to be called when data arrives on this - * channel - */ - public WebSocketChannel(ProtoCallback callback) { - this.callback = callback; - this.sessionContext = - Timing.isEnabled() ? Timing.getScopeValue(SessionContext.class) : null; - // The ProtoSerializer could really be singleton. - // TODO: Figure out a way to inject a singleton instance using Guice - this.serializer = new ProtoSerializer(); - } - - public void handleMessageString(String data) { - LOG.fine("received JSON message " + data); - if (Timing.isEnabled()) { - Timing.enterScope(); - Timing.setScopeValue(SessionContext.class, sessionContext); - } - try { - Message message; - - MessageWrapper wrapper = MessageWrapper.deserialize(gson, data); - - try { - message = serializer.fromJson(wrapper.message, wrapper.messageType); - } catch (SerializationException e) { - LOG.warning("message handling error", e); - e.printStackTrace(); - return; - } - callback.message(wrapper.sequenceNumber, message); - } finally { - Timing.exitScope(); - } - } - - static <T extends GsonSerializable> T load(JsonElement payload, T x, Gson gson) { - try { - x.fromGson(payload, gson, null); - return x; - } catch (GsonException e) { - LOG.warning("JSON load error", e); - e.printStackTrace(); - return null; - } - } - - /** - * Sends a message on the socket. - * - * @param data message to send - * @throws IOException if the communication fails - */ - protected abstract void sendMessageString(String data) throws IOException; - - @Override - public void sendMessage(int sequenceNo, Message message) { - JsonElement json; - String str; - - Timer timer = Timing.start("serializeMessage"); - try { - json = serializer.toJson(message); - String type = message.getDescriptorForType().getName(); - str = MessageWrapper.serialize(type, sequenceNo, json); - } catch (SerializationException e) { - LOG.warning("Failed to JSONify proto message", e); - return; - } finally { - Timing.stop(timer); - } - try { - sendMessageString(str); - LOG.fine("sent JSON message over websocket, sequence number " + sequenceNo - + ", message " + message); - } catch (IOException e) { - // TODO(anorth): This failure should be communicated to the caller - // so it can attempt retransmission. - LOG.warning("Failed to transmit message on socket, sequence number " + sequenceNo - + ", message " + message, e); - return; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/051db092/wave/src/main/java/org/waveprotocol/box/server/rpc/WebSocketChannelImpl.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/rpc/WebSocketChannelImpl.java b/wave/src/main/java/org/waveprotocol/box/server/rpc/WebSocketChannelImpl.java deleted file mode 100644 index c74e07a..0000000 --- a/wave/src/main/java/org/waveprotocol/box/server/rpc/WebSocketChannelImpl.java +++ /dev/null @@ -1,78 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.rpc; - -import org.waveprotocol.wave.util.logging.Log; - -import org.eclipse.jetty.websocket.api.Session; -import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose; -import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect; -import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage; -import org.eclipse.jetty.websocket.api.annotations.WebSocket; - - -import java.io.IOException; - -/** - * A channel implementation for websocket. - * - * @author akapla...@gmai.com (A. Kaplanov) - */ -@WebSocket -public class WebSocketChannelImpl extends WebSocketChannel { - private static final Log LOG = Log.get(WebSocketChannelImpl.class); - - private Session session; - - public WebSocketChannelImpl(ProtoCallback callback) { - super(callback); - } - - @OnWebSocketConnect - public void onOpen(Session session) { - synchronized (this) { - this.session = session; - } - } - - @OnWebSocketMessage - public void onMessage(String data) { - handleMessageString(data); - } - - @OnWebSocketClose - public void onClose(int closeCode, String closeReason) { - LOG.fine("websocket disconnected (" + closeCode + " - " + closeReason + "): " + this); - synchronized (this) { - session = null; - } - } - - @Override - protected void sendMessageString(String data) throws IOException { - synchronized (this) { - if (session == null) { - LOG.warning("Websocket is not connected"); - } else { - session.getRemote().sendStringByFuture(data); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/051db092/wave/src/main/java/org/waveprotocol/box/server/rpc/WebSocketClientRpcChannel.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/rpc/WebSocketClientRpcChannel.java b/wave/src/main/java/org/waveprotocol/box/server/rpc/WebSocketClientRpcChannel.java deleted file mode 100644 index 272e205..0000000 --- a/wave/src/main/java/org/waveprotocol/box/server/rpc/WebSocketClientRpcChannel.java +++ /dev/null @@ -1,153 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.rpc; - -import org.waveprotocol.wave.util.logging.Log; - -import com.google.common.base.Preconditions; -import com.google.common.collect.BiMap; -import com.google.common.collect.HashBiMap; -import com.google.protobuf.Descriptors.MethodDescriptor; -import com.google.protobuf.Message; -import com.google.protobuf.RpcCallback; -import com.google.protobuf.RpcController; - -import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; -import org.eclipse.jetty.websocket.client.WebSocketClient; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * Implementation of {@link ClientRpcChannel} based on a - * {@link WebSocketClientChannel}. - */ -public class WebSocketClientRpcChannel implements ClientRpcChannel { - private static final Log LOG = Log.get(WebSocketClientRpcChannel.class); - - private final WebSocketClient socketClient; - private final WebSocketChannel clientChannel; - private final AtomicInteger lastSequenceNumber = new AtomicInteger(); - private final BiMap<Integer, ClientRpcController> activeMethodMap = HashBiMap.create(); - - /** - * Set up a new WebSocketClientRpcChannel pointing at the given server - * address. - * - * @param serverAddress the target server address - */ - public WebSocketClientRpcChannel(SocketAddress serverAddress) - throws IOException { - Preconditions.checkNotNull(serverAddress, "null serverAddress"); - - ProtoCallback callback = new ProtoCallback() { - @Override - public void message(int sequenceNo, Message message) { - final ClientRpcController controller; - synchronized (activeMethodMap) { - controller = activeMethodMap.get(sequenceNo); - // TODO: remove controller from activeMethodMap - } - if (message instanceof Rpc.RpcFinished) { - Rpc.RpcFinished finished = (Rpc.RpcFinished) message; - if (finished.getFailed()) { - controller.failure(finished.getErrorText()); - } else { - controller.response(null); - } - } else { - controller.response(message); - } - } - }; - clientChannel = new WebSocketChannelImpl(callback); - socketClient = openWebSocket(clientChannel, (InetSocketAddress) serverAddress); - clientChannel.expectMessage(Rpc.RpcFinished.getDefaultInstance()); - LOG.fine("Opened a new WebSocketClientRpcChannel to " + serverAddress); - } - - @Override - public RpcController newRpcController() { - return new ClientRpcController(this); - } - - @Override - public void callMethod(MethodDescriptor method, RpcController genericRpcController, - Message request, Message responsePrototype, RpcCallback<Message> callback) { - // Cast the given generic controller to a ClientRpcController. - final ClientRpcController controller; - if (genericRpcController instanceof ClientRpcController) { - controller = (ClientRpcController) genericRpcController; - } else { - throw new IllegalArgumentException("Expected ClientRpcController, got: " - + genericRpcController.getClass()); - } - - // Generate a new sequence number, and configure the controller - notably, - // this throws an IllegalStateException if it is *already* configured. - final int sequenceNo = lastSequenceNumber.incrementAndGet(); - final ClientRpcController.RpcState rpcStatus = - new ClientRpcController.RpcState(this, method.getOptions() - .getExtension(Rpc.isStreamingRpc), callback, new Runnable() { - @Override - public void run() { - clientChannel.sendMessage(sequenceNo, Rpc.CancelRpc.getDefaultInstance()); - } - }); - controller.configure(rpcStatus); - synchronized (activeMethodMap) { - activeMethodMap.put(sequenceNo, controller); - } - LOG.fine("Calling a new RPC (seq " + sequenceNo + "), method " + method.getFullName() + " for " - + clientChannel); - - // Kick off the RPC by sending the request to the server end-point. - clientChannel.sendMessage(sequenceNo, request, responsePrototype); - } - - private WebSocketClient openWebSocket(WebSocketChannel clientChannel, - InetSocketAddress inetAddress) throws IOException { - URI uri; - try { - uri = new URI("ws", null, inetAddress.getHostName(), inetAddress.getPort(), "/socket", - null, null); - } catch (URISyntaxException e) { - LOG.severe("Unable to create ws:// uri from given address (" + inetAddress + ")", e); - throw new IllegalStateException(e); - } - WebSocketClient client = new WebSocketClient(); - try { - client.start(); - } catch (Exception ex) { - throw new RuntimeException(ex); - } - ClientUpgradeRequest request = new ClientUpgradeRequest(); - try { - client.connect(clientChannel, uri, request).get(); - } catch (Exception ex) { - throw new IOException(ex); - } - return client; - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/051db092/wave/src/main/java/org/waveprotocol/box/server/rpc/atmosphere/AtmosphereChannel.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/rpc/atmosphere/AtmosphereChannel.java b/wave/src/main/java/org/waveprotocol/box/server/rpc/atmosphere/AtmosphereChannel.java deleted file mode 100644 index 4bc9adb..0000000 --- a/wave/src/main/java/org/waveprotocol/box/server/rpc/atmosphere/AtmosphereChannel.java +++ /dev/null @@ -1,116 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.rpc.atmosphere; - - -import org.atmosphere.cpr.AtmosphereResource; -import org.atmosphere.cpr.Broadcaster; -import org.atmosphere.cpr.BroadcasterFactory; -import org.waveprotocol.box.server.rpc.ProtoCallback; -import org.waveprotocol.box.server.rpc.WebSocketChannel; -import org.waveprotocol.wave.util.logging.Log; - -import java.io.IOException; - -/** - * An atmosphere wrapper for the WebSocketChannel type. - * - * @author pablo...@gmail.com (Pablo Ojanguren) - */ -public class AtmosphereChannel extends WebSocketChannel { - - - private static final Log LOG = Log.get(AtmosphereChannel.class); - - /* The object needed to send messages out */ - private Broadcaster broadcaster; - - - - /** - * Creates a new AtmosphereChannel using the callback for incoming messages. - * - * @param callback A ProtoCallback instance called with incoming messages. - */ - public AtmosphereChannel(ProtoCallback callback) { - super(callback); - broadcaster = BroadcasterFactory.getDefault().get(); - - } - - /** - * A new resource connection has been associated with - * this channel - * @param resource the Atmosphere resource object - */ - public void onConnect(AtmosphereResource resource) { - - // Create a new broadcaster to publish to this resource - broadcaster.addAtmosphereResource(resource); - } - - - public Broadcaster getBroadcaster() { - return broadcaster; - } - - /** - * The atmosphere resource has received a new post message - * @param message the message - */ - public void onMessage(String message) { - - handleMessageString(message); - } - - /** - * The atmosphere resource has been closed - */ - public void onDisconnect() { - - broadcaster = null; - } - - - - /** - * Send the given data String - * - * @param data - * @throws IOException - */ - @Override - protected void sendMessageString(String data) throws IOException { - - if (broadcaster == null || broadcaster.isDestroyed()) { - // Just drop the message. It's rude to throw an exception since the - // caller had no way of knowing. - LOG.warning("Atmosphere Channel is not connected"); - } else { - - LOG.fine("BROADCAST "+data); - broadcaster.broadcast(data); - } - } - - - - -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/051db092/wave/src/main/java/org/waveprotocol/box/server/rpc/atmosphere/AtmosphereClientInterceptor.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/rpc/atmosphere/AtmosphereClientInterceptor.java b/wave/src/main/java/org/waveprotocol/box/server/rpc/atmosphere/AtmosphereClientInterceptor.java deleted file mode 100644 index 12468d3..0000000 --- a/wave/src/main/java/org/waveprotocol/box/server/rpc/atmosphere/AtmosphereClientInterceptor.java +++ /dev/null @@ -1,103 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.rpc.atmosphere; - - - -import com.google.common.io.ByteStreams; - -import org.atmosphere.config.service.AtmosphereInterceptorService; -import org.atmosphere.cpr.Action; -import org.atmosphere.cpr.AtmosphereConfig; -import org.atmosphere.cpr.AtmosphereInterceptor; -import org.atmosphere.cpr.AtmosphereRequest; -import org.atmosphere.cpr.AtmosphereResource; -import org.waveprotocol.wave.util.logging.Log; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; - - /** - * Serve atmosphere.js file to GWT clients This class allows to serve the - * Atmosphere Javascript client from the same /atmosphere servlet path which is - * used to process any Atmosphere request instead of place it on the /static - * path. In addition, the Javascript file is put together with the rest of - * Atmosphere related source code, during the build process as any other third - * party dependency. - * - * - * @author pablo...@gmail.com (Pablo Ojanguren) - */ - @AtmosphereInterceptorService - public class AtmosphereClientInterceptor implements AtmosphereInterceptor { - - private static final Log LOG = Log.get(AtmosphereClientInterceptor.class); - - @Override - public void configure(AtmosphereConfig config) { - // Nothing to do - } - - @Override - public Action inspect(AtmosphereResource resource) { - - AtmosphereRequest request = resource.getRequest(); - - try { - // Find the first context parameter - String path = request.getPathInfo(); - - if (path == null || path.isEmpty()) - return Action.CONTINUE; - - if (path.startsWith("/")) { - path = path.substring(1); - } - String[] parts = path.split("/"); - - // Serve the file - if (parts.length > 0 && "GET".equals(resource.getRequest().getMethod()) && "atmosphere.js".equals(parts[0])) { - resource.getResponse().setContentType("text/javascript"); - InputStream is = - this.getClass().getClassLoader() - .getResourceAsStream("org/waveprotocol/box/server/rpc/atmosphere/atmosphere.js"); - OutputStream os = resource.getResponse().getOutputStream(); - ByteStreams.copy(is, os); - return Action.CANCELLED; - - } - - - } catch (IOException e) { - LOG.severe("Error sending atmosphere.js",e); - } - - - return Action.CONTINUE; - } - - - @Override - public void postInspect(AtmosphereResource resource) { - // Nothing to do - } - - } http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/051db092/wave/src/main/java/org/waveprotocol/box/server/rpc/atmosphere/GuiceAtmosphereFactory.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/rpc/atmosphere/GuiceAtmosphereFactory.java b/wave/src/main/java/org/waveprotocol/box/server/rpc/atmosphere/GuiceAtmosphereFactory.java deleted file mode 100644 index 3611caf..0000000 --- a/wave/src/main/java/org/waveprotocol/box/server/rpc/atmosphere/GuiceAtmosphereFactory.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.rpc.atmosphere; - -import com.google.inject.Injector; - -import org.atmosphere.cpr.AtmosphereFramework; -import org.atmosphere.cpr.AtmosphereObjectFactory; -import org.waveprotocol.wave.util.logging.Log; - -/** - * Custom factory to use wave's guice injector in Atmosphere - * - * @author pablo...@gmail.com (Pablo Ojanguren) - * - */ -public class GuiceAtmosphereFactory implements AtmosphereObjectFactory { - - private static final Log LOG = Log.get(GuiceAtmosphereFactory.class); - - private static Injector injector; - - @SuppressWarnings("unchecked") - @Override - public <T, U extends T> U newClassInstance(AtmosphereFramework framework, Class<T> classType, Class<U> classToInstantiate) throws InstantiationException, IllegalAccessException { - initInjector(framework); - - - if (injector == null) { - return classToInstantiate.newInstance(); - } else { - return injector.getInstance(classToInstantiate); - } - } - - public String toString() { - return "Guice ObjectFactory"; - } - - private void initInjector(AtmosphereFramework framework) { - if (injector == null) { - com.google.inject.Injector servletInjector = (com.google.inject.Injector) - framework.getServletContext().getAttribute( - com.google.inject.Injector.class.getName()); - - if (servletInjector != null) { - injector = servletInjector; - LOG.fine("Existing injector found to create Atmosphere instances"); - } else { - LOG.fine("Not injector not found to create Atmosphere instances"); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/051db092/wave/src/main/java/org/waveprotocol/box/server/rpc/testing/FakeServerRpcController.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/rpc/testing/FakeServerRpcController.java b/wave/src/main/java/org/waveprotocol/box/server/rpc/testing/FakeServerRpcController.java deleted file mode 100644 index fa45cd6..0000000 --- a/wave/src/main/java/org/waveprotocol/box/server/rpc/testing/FakeServerRpcController.java +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.rpc.testing; - -import static org.waveprotocol.box.server.util.testing.TestingConstants.USER; - -import com.google.protobuf.RpcCallback; - -import org.waveprotocol.box.server.rpc.ServerRpcController; -import org.waveprotocol.wave.model.wave.ParticipantId; - - -/** - * An {@code RpcController} that just handles error text and failure condition. - */ -public class FakeServerRpcController implements ServerRpcController { - private boolean failed = false; - private String errorText = null; - - @Override - public String errorText() { - return errorText; - } - - @Override - public boolean failed() { - return failed; - } - - @Override - public boolean isCanceled() { - return false; - } - - @Override - public void notifyOnCancel(RpcCallback<Object> arg) { - } - - @Override - public void reset() { - failed = false; - errorText = null; - } - - @Override - public void setFailed(String error) { - failed = true; - errorText = error; - } - - @Override - public void startCancel() { - } - - @Override - public ParticipantId getLoggedInUser() { - return ParticipantId.ofUnsafe(USER); - } - - @Override - public void cancel() { - } - - @Override - public void run() { - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/051db092/wave/src/main/java/org/waveprotocol/box/server/shutdown/LifeCycle.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/shutdown/LifeCycle.java b/wave/src/main/java/org/waveprotocol/box/server/shutdown/LifeCycle.java deleted file mode 100644 index eb0c893..0000000 --- a/wave/src/main/java/org/waveprotocol/box/server/shutdown/LifeCycle.java +++ /dev/null @@ -1,118 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.shutdown; - -import org.waveprotocol.wave.model.util.Preconditions; - -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -/** - * Life cycle control. - * - * @author akapla...@gmail.com (A. Kaplanov) - */ -public class LifeCycle { - - private static int SHUTDOWN_TIMEOUT_SEC = 2; - - private final String name; - private final ShutdownPriority shutdownPriority; - private final Shutdownable shutdownHandler; - private final Semaphore semaphore = new Semaphore(Integer.MAX_VALUE); - private final ShutdownManager shutdownManager; - private boolean started; - - /** - * Creates lifecycle. - * - * @param name the name of task. - * @param shutdownPriority determines shutdown order. - * @param shutdownHandler the handler executed on shutdown. - */ - public LifeCycle(String name, ShutdownPriority shutdownPriority, Shutdownable shutdownHandler) { - this(name, shutdownPriority, shutdownHandler, ShutdownManager.getInstance()); - } - - /** - * Creates lifecycle. - * - * @param name the name of task. - * @param shutdownPriority determines shutdown order. - * @param shutdownHandler the handler executed on shutdown. - * @param shutdownManager the shutdown manager. - */ - public LifeCycle(String name, ShutdownPriority shutdownPriority, Shutdownable shutdownHandler, - ShutdownManager shutdownManager) { - this.name = name; - this.shutdownPriority = shutdownPriority; - this.shutdownHandler = shutdownHandler; - this.shutdownManager = shutdownManager; - } - - /** - * Starts lifecycle. - */ - public synchronized void start() { - Preconditions.checkArgument(!started, name + " is already started."); - started = true; - shutdownManager.register(new Shutdownable() { - - @Override - public void shutdown() throws Exception { - synchronized (LifeCycle.this) { - if (shutdownHandler != null) { - shutdownHandler.shutdown(); - } - if (!semaphore.tryAcquire(Integer.MAX_VALUE, SHUTDOWN_TIMEOUT_SEC, TimeUnit.SECONDS)) { - throw new TimeoutException(); - } - started = false; - } - } - }, name, shutdownPriority); - } - - /** - * Enters to execution block of task. - */ - public synchronized void enter() { - checkIsStarted(); - try { - semaphore.acquire(); - } catch (InterruptedException ex) { - throw new RuntimeException(ex); - } - } - - /** - * Leaves execution block of task. - */ - public synchronized void leave() { - semaphore.release(); - } - - private void checkIsStarted() { - if (!started) { - throw new IllegalStateException(name + " is not started"); - } - } -}