Author: solomax
Date: Wed Jun 20 08:23:00 2012
New Revision: 1351985

URL: http://svn.apache.org/viewvc?rev=1351985&view=rev
Log:
OPENMEETINGS-295 RTMPT screen sharing client is fixed.

Added:
    
incubator/openmeetings/trunk/singlewebapp/WebContent/red5/client_rtmpt_patch/
    
incubator/openmeetings/trunk/singlewebapp/WebContent/red5/client_rtmpt_patch/RTMPTClient.java
    
incubator/openmeetings/trunk/singlewebapp/WebContent/red5/client_rtmpt_patch/RTMPTClientConnector.java
Modified:
    incubator/openmeetings/trunk/singlewebapp/build.xml

Added: 
incubator/openmeetings/trunk/singlewebapp/WebContent/red5/client_rtmpt_patch/RTMPTClient.java
URL: 
http://svn.apache.org/viewvc/incubator/openmeetings/trunk/singlewebapp/WebContent/red5/client_rtmpt_patch/RTMPTClient.java?rev=1351985&view=auto
==============================================================================
--- 
incubator/openmeetings/trunk/singlewebapp/WebContent/red5/client_rtmpt_patch/RTMPTClient.java
 (added)
+++ 
incubator/openmeetings/trunk/singlewebapp/WebContent/red5/client_rtmpt_patch/RTMPTClient.java
 Wed Jun 20 08:23:00 2012
@@ -0,0 +1,130 @@
+/*
+ * RED5 Open Source Flash Server - http://code.google.com/p/red5/
+ * 
+ * Copyright 2006-2012 by respective authors (see below). All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.red5.client.net.rtmpt;
+
+import java.util.Map;
+
+import org.apache.mina.core.buffer.IoBuffer;
+import org.apache.mina.core.session.IoSession;
+import org.red5.client.net.rtmp.BaseRTMPClientHandler;
+import org.red5.io.object.Deserializer;
+import org.red5.io.object.Serializer;
+import org.red5.server.net.protocol.ProtocolState;
+import org.red5.server.net.rtmp.RTMPConnection;
+import org.red5.server.net.rtmp.RTMPMinaConnection;
+import org.red5.server.net.rtmp.codec.RTMP;
+import org.red5.server.net.rtmp.codec.RTMPProtocolDecoder;
+import org.red5.server.net.rtmp.codec.RTMPProtocolEncoder;
+import org.red5.server.net.rtmp.message.Constants;
+import org.red5.server.net.rtmpt.codec.RTMPTCodecFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * RTMPT client object
+ * 
+ * @author Anton Lebedevich
+ */
+public class RTMPTClient extends BaseRTMPClientHandler {
+
+       private static final Logger log = 
LoggerFactory.getLogger(RTMPTClient.class);
+
+       // guarded by this
+       private RTMPTClientConnector connector;
+       
+       private RTMPTCodecFactory codecFactory;
+       
+       public RTMPTClient() {
+               codecFactory = new RTMPTCodecFactory();
+               codecFactory.setDeserializer(new Deserializer());
+               codecFactory.setSerializer(new Serializer());
+               codecFactory.init();
+       }
+
+       public Map<String, Object> makeDefaultConnectionParams(String server, 
int port, String application) {
+               Map<String, Object> params = 
super.makeDefaultConnectionParams(server, port, application);
+               if (!params.containsKey("tcUrl")) {
+                       params.put("tcUrl", "rtmpt://" + server + ':' + port + 
'/' + application);
+               }
+               return params;
+       }
+
+       protected synchronized void startConnector(String server, int port) {
+               connector = new RTMPTClientConnector(server, port, this);
+               log.debug("Created connector {}", connector);
+               connector.start();
+       }
+
+       /** {@inheritDoc} */
+       @Override
+       public void messageReceived(Object in, IoSession session) throws 
Exception {
+               RTMPConnection conn = (RTMPTClientConnection) 
session.getAttribute(RTMPConnection.RTMP_CONNECTION_KEY);
+               RTMP state = (RTMP) 
session.getAttribute(ProtocolState.SESSION_KEY);
+               if (in instanceof IoBuffer) {
+                       rawBufferRecieved(conn, state, (IoBuffer) in);
+               } else {
+                       super.messageReceived(in, session);
+               }
+       }
+
+       /**
+        * Handle raw buffer receipt
+        * 
+        * @param conn
+        *            RTMP connection
+        * @param state
+        *            Protocol state
+        * @param in
+        *            IoBuffer with input raw data
+        */
+       private void rawBufferRecieved(RTMPConnection conn, ProtocolState 
state, IoBuffer _in) {
+               log.debug("Handshake 3d phase - size: {}", _in.remaining());
+               IoBuffer out = IoBuffer.allocate(Constants.HANDSHAKE_SIZE);
+               IoBuffer in = _in;
+               if (!in.isAutoExpand()) {
+                       in = IoBuffer.allocate(in.position() + 
Constants.HANDSHAKE_SIZE);
+                       in.setAutoExpand(true);
+                       in.put(in);
+               }
+               in.skip(1);
+               in.limit(in.position() + Constants.HANDSHAKE_SIZE);
+               out.put(in);
+               out.flip();
+               conn.writeRaw(out);
+               connectionOpened(conn, conn.getState());
+       }
+
+       public synchronized void disconnect() {
+               if (connector != null) {
+                       connector.setStopRequested(true);
+                       connector.interrupt();
+               }
+               super.disconnect();
+       }
+
+       public RTMPProtocolDecoder getDecoder() {
+               return codecFactory.getRTMPDecoder();
+       }
+
+       public RTMPProtocolEncoder getEncoder() {
+               return codecFactory.getRTMPEncoder();
+       }
+       
+       
+}

Added: 
incubator/openmeetings/trunk/singlewebapp/WebContent/red5/client_rtmpt_patch/RTMPTClientConnector.java
URL: 
http://svn.apache.org/viewvc/incubator/openmeetings/trunk/singlewebapp/WebContent/red5/client_rtmpt_patch/RTMPTClientConnector.java?rev=1351985&view=auto
==============================================================================
--- 
incubator/openmeetings/trunk/singlewebapp/WebContent/red5/client_rtmpt_patch/RTMPTClientConnector.java
 (added)
+++ 
incubator/openmeetings/trunk/singlewebapp/WebContent/red5/client_rtmpt_patch/RTMPTClientConnector.java
 Wed Jun 20 08:23:00 2012
@@ -0,0 +1,215 @@
+/*
+ * RED5 Open Source Flash Server - http://code.google.com/p/red5/
+ * 
+ * Copyright 2006-2012 by respective authors (see below). All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.red5.client.net.rtmpt;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpHost;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.HttpVersion;
+import org.apache.http.ParseException;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.entity.InputStreamEntity;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.params.CoreProtocolPNames;
+import org.apache.http.util.EntityUtils;
+import org.apache.mina.core.buffer.IoBuffer;
+import org.apache.mina.core.session.DummySession;
+import org.apache.mina.core.session.IoSession;
+import org.red5.client.net.rtmp.RTMPClientConnManager;
+import org.red5.server.net.protocol.ProtocolState;
+import org.red5.server.net.rtmp.RTMPConnection;
+import org.red5.server.net.rtmp.codec.RTMP;
+import org.red5.server.net.rtmp.message.Constants;
+import org.red5.server.util.HttpConnectionUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Client connector for RTMPT
+ * 
+ * @author Anton Lebedevich ([email protected])
+ * @author Paul Gregoire ([email protected])
+ */
+class RTMPTClientConnector extends Thread {
+       
+       private static final Logger log = 
LoggerFactory.getLogger(RTMPTClientConnector.class);
+
+       private static final String CONTENT_TYPE = "application/x-fcs";
+
+       private static final ByteArrayEntity ZERO_REQUEST_ENTITY = new 
ByteArrayEntity(new byte[] { 0 });
+
+       /**
+        * Size to split messages queue by, borrowed from
+        * RTMPTServlet.RESPONSE_TARGET_SIZE
+        */
+       private static final int SEND_TARGET_SIZE = 32768;
+
+       private final DefaultHttpClient httpClient = 
HttpConnectionUtil.getClient();
+
+       private final HttpHost targetHost;
+       
+       private final RTMPTClient client;
+
+       private final RTMPClientConnManager connManager;
+
+       private int clientId;
+
+       private long messageCount = 1;
+
+       private volatile boolean stopRequested = false;
+
+       public RTMPTClientConnector(String server, int port, RTMPTClient 
client) {
+               targetHost = new HttpHost(server, port, "http");
+               
httpClient.getParams().setParameter(CoreProtocolPNames.PROTOCOL_VERSION, 
HttpVersion.HTTP_1_1);
+               this.client = client;
+               this.connManager = RTMPClientConnManager.getInstance();
+       }
+
+       public void run() {
+               HttpPost post = null;   
+               try {
+                       RTMPTClientConnection connection = openConnection();
+                       while (!connection.isClosing() && !stopRequested) {
+                               IoBuffer toSend = 
connection.getPendingMessages(SEND_TARGET_SIZE);                      
+                               int limit = toSend != null ? toSend.limit() : 0;
+                               if (limit > 0) {
+                                       post = makePost("send");
+                                       post.setEntity(new 
InputStreamEntity(toSend.asInputStream(), limit));
+                                       post.addHeader("Content-Type", 
CONTENT_TYPE);
+                               } else {
+                                       post = makePost("idle");
+                                       post.setEntity(ZERO_REQUEST_ENTITY);
+                               }
+                               // execute
+                               HttpResponse response = 
httpClient.execute(targetHost, post);
+                               // check for error
+                               checkResponseCode(response);
+                               // handle data
+                               byte[] received = 
EntityUtils.toByteArray(response.getEntity());
+                               IoBuffer data = IoBuffer.wrap(received);
+                               if (data.limit() > 0) {
+                                       data.skip(1); // XXX: polling interval 
lies in this byte
+                               }
+                               List<?> messages = connection.decode(data);
+                               if (messages == null || messages.isEmpty()) {
+                                       try {
+                                               // XXX handle polling delay
+                                               Thread.sleep(250);
+                                       } catch (InterruptedException e) {
+                                               if (stopRequested) {
+                                                       post.abort();
+                                                       break;
+                                               }
+                                       }
+                                       continue;
+                               }
+                               IoSession session = new DummySession();
+                               
session.setAttribute(RTMPConnection.RTMP_CONNECTION_KEY, connection);
+                               session.setAttribute(ProtocolState.SESSION_KEY, 
connection.getState());
+                               for (Object message : messages) {
+                                       try {
+                                               client.messageReceived(message, 
session);
+                                       } catch (Exception e) {
+                                               log.error("Could not process 
message.", e);
+                                       }
+                               }
+                       }
+                       finalizeConnection();
+                       client.connectionClosed(connection, 
connection.getState());
+               } catch (Throwable e) {
+                       log.debug("RTMPT handling exception", e);
+                       client.handleException(e);
+                       if (post != null) {
+                               post.abort();
+                       }
+               }
+       }
+
+       private RTMPTClientConnection openConnection() throws IOException {
+               RTMPTClientConnection connection = null;
+               HttpPost openPost = new HttpPost("/open/1");
+               setCommonHeaders(openPost);
+               openPost.setEntity(ZERO_REQUEST_ENTITY);
+               // execute
+               HttpResponse response = httpClient.execute(targetHost, 
openPost);
+               checkResponseCode(response);
+               // get the response entity
+               HttpEntity entity = response.getEntity();
+               if (entity != null) {
+                       String responseStr = EntityUtils.toString(entity);
+                       clientId = Integer.parseInt(responseStr.substring(0, 
responseStr.length() - 1));
+                       log.debug("Got client id {}", clientId);
+                       // create a new connection
+                       connection = (RTMPTClientConnection) 
connManager.createConnection(RTMPTClientConnection.class);
+                       // client state
+                       RTMP state = new RTMP();
+                       connection.setState(state);
+                       connection.setHandler(client);
+                       connection.setDecoder(client.getDecoder());
+                       connection.setEncoder(client.getEncoder());
+                       log.debug("Handshake 1st phase");
+                       IoBuffer handshake = 
IoBuffer.allocate(Constants.HANDSHAKE_SIZE + 1);
+                       handshake.put((byte) 0x03);
+                       handshake.fill((byte) 0x01, Constants.HANDSHAKE_SIZE);
+                       handshake.flip();
+                       connection.writeRaw(handshake);
+               }
+               return connection;                      
+       }
+
+       private void finalizeConnection() throws IOException {
+               log.debug("Sending close post");
+               HttpPost closePost = new HttpPost(makeUrl("close"));
+               closePost.setEntity(ZERO_REQUEST_ENTITY);
+               HttpResponse response = httpClient.execute(targetHost, 
closePost);
+               EntityUtils.consume(response.getEntity());
+       }
+
+       private HttpPost makePost(String command) {
+               HttpPost post = new HttpPost(makeUrl(command));
+               setCommonHeaders(post);
+               return post;
+       }
+
+       private String makeUrl(String command) {
+               // use message count from connection
+               return String.format("/%s/%s/%s", command, clientId, 
messageCount++);
+       }
+
+       private void setCommonHeaders(HttpPost post) {
+               post.addHeader("Connection", "Keep-Alive");
+               post.addHeader("Cache-Control", "no-cache");
+       }
+
+       private void checkResponseCode(HttpResponse response) throws 
ParseException, IOException {
+               int code = response.getStatusLine().getStatusCode();
+               if (code != HttpStatus.SC_OK) {
+                       throw new RuntimeException("Bad HTTP status returned, 
line: " + response.getStatusLine() + "; body: " + 
EntityUtils.toString(response.getEntity()));
+               }
+       }
+
+       public void setStopRequested(boolean stopRequested) {
+               this.stopRequested = stopRequested;
+       }
+}

Modified: incubator/openmeetings/trunk/singlewebapp/build.xml
URL: 
http://svn.apache.org/viewvc/incubator/openmeetings/trunk/singlewebapp/build.xml?rev=1351985&r1=1351984&r2=1351985&view=diff
==============================================================================
--- incubator/openmeetings/trunk/singlewebapp/build.xml (original)
+++ incubator/openmeetings/trunk/singlewebapp/build.xml Wed Jun 20 08:23:00 2012
@@ -34,7 +34,8 @@
        <property name="mainlibs.lib.dir" value="${project.lib.dir}/mainlibs" />
        <property name="om.lib.dir" value="${project.lib.dir}/om" />
        <property name="anakia.lib.dir" value="${project.lib.dir}/anakia" />
-       <property name="screensharing.resources" 
value="${basedir}/WebContent/screensharing" />
+       <property name="webcontent.src.dir" value="${basedir}/WebContent" />
+       <property name="screensharing.resources" 
value="${webcontent.src.dir}/screensharing" />
        <property name="junit.lib.dir" value="${project.lib.dir}/junit" />
        <property name="rat.lib.dir" value="${project.lib.dir}/rat" />
        <property name="dtd-generator.lib.dir" 
value="${project.lib.dir}/dtd-generator" />
@@ -58,7 +59,6 @@
 
        <!-- LPS Properties -->
        <property name="out.dir.swf" value="${basedir}/${dist.webapps.dir}/" />
-       <property name="webcontent.src.dir" value="${basedir}/WebContent" />
        <property name="laszlo.as3.src.dir" value="${webcontent.src.dir}/swf10" 
/>
        <property name="laszlo.src.dir" value="${webcontent.src.dir}/src" />
 
@@ -213,10 +213,10 @@
 
                <!-- add language files from .war version -->
                <copy todir="${dist.webapps.dir}/languages">
-                       <fileset dir="WebContent/languages" />
+                       <fileset dir="${webcontent.src.dir}/languages" />
                </copy>
                <copy todir="${dist.webapps.dir}/conf">
-                       <fileset dir="WebContent/conf" />
+                       <fileset dir="${webcontent.src.dir}/conf" />
                </copy>
                <tstamp />
        </target>
@@ -555,7 +555,7 @@
                        </classpath>
                </taskdef>
 
-               <xslt in="WebContent/languages/errorvalues.xml" 
out="${docs.src}/ErrorsTable.xml" 
style="${docs.src}/stylesheets/errortable.xsl" />
+               <xslt in="${webcontent.src.dir}/languages/errorvalues.xml" 
out="${docs.src}/ErrorsTable.xml" 
style="${docs.src}/stylesheets/errortable.xsl" />
                <anakia basedir="${docs.src}" destdir="${docs.dest}/" 
extension=".html" style="./site.vsl" projectFile="stylesheets/project.xml" 
excludes="**/stylesheets/** empty.xml" includes="**/*.xml" 
lastModifiedCheck="true" templatePath="xdocs/stylesheets" 
velocityPropertiesFile="${build.base.dir}/velocity.properties" />
                <copy todir="${docs.dest}/images" filtering="no">
                        <fileset dir="${docs.src}/images">
@@ -777,7 +777,9 @@
                <svn refid="svn.settings">
                        <checkout 
url="http://red5.googlecode.com/svn/java/client/trunk/"; 
revision="${red5.client.revision}" destPath="${red5.client.dir}" />
                </svn>
-               <copy file="${red5.lib}/red5.jar" 
todir="${red5.client.dir}/lib" />
+               <copy file="${red5.lib}/red5.jar" 
todir="${red5.client.dir}/lib" overwrite="true" force="true" />
+               <copy 
file="${webcontent.src.dir}/red5/client_rtmpt_patch/RTMPTClient.java" 
todir="${red5.client.dir}/src/org/red5/client/net/rtmpt" overwrite="true" 
force="true" />
+               <copy 
file="${webcontent.src.dir}/red5/client_rtmpt_patch/RTMPTClientConnector.java" 
todir="${red5.client.dir}/src/org/red5/client/net/rtmpt" overwrite="true" 
force="true" />
                <subant target="dist">
                        <fileset dir="${red5.client.dir}" includes="build.xml" 
/>
                </subant>


Reply via email to