Author: solomax
Date: Tue May 15 07:05:21 2012
New Revision: 1338566
URL: http://svn.apache.org/viewvc?rev=1338566&view=rev
Log:
OPENMEETINGS-266 Proxy was modified to handle errors; Applet was modified to
handle proxy exceptions as expected.
Modified:
incubator/openmeetings/trunk/singlewebapp/src/org/openmeetings/app/remote/red5/ScopeApplicationAdapter.java
incubator/openmeetings/trunk/singlewebapp/src/org/openmeetings/screen/webstart/CoreScreenShare.java
incubator/openmeetings/trunk/singlewebapp/src/org/openmeetings/screen/webstart/ScreenV1Encoder.java
Modified:
incubator/openmeetings/trunk/singlewebapp/src/org/openmeetings/app/remote/red5/ScopeApplicationAdapter.java
URL:
http://svn.apache.org/viewvc/incubator/openmeetings/trunk/singlewebapp/src/org/openmeetings/app/remote/red5/ScopeApplicationAdapter.java?rev=1338566&r1=1338565&r2=1338566&view=diff
==============================================================================
---
incubator/openmeetings/trunk/singlewebapp/src/org/openmeetings/app/remote/red5/ScopeApplicationAdapter.java
(original)
+++
incubator/openmeetings/trunk/singlewebapp/src/org/openmeetings/app/remote/red5/ScopeApplicationAdapter.java
Tue May 15 07:05:21 2012
@@ -51,6 +51,7 @@ import org.openmeetings.app.persistence.
import org.openmeetings.app.remote.FLVRecorderService;
import org.openmeetings.app.remote.WhiteBoardService;
import org.openmeetings.utils.math.CalendarPatterns;
+import org.red5.io.utils.ObjectMap;
import org.red5.logging.Red5LoggerFactory;
import org.red5.server.adapter.ApplicationAdapter;
import org.red5.server.api.IBasicScope;
@@ -64,11 +65,21 @@ import org.red5.server.api.service.IServ
import org.red5.server.api.stream.IBroadcastStream;
import org.red5.server.api.stream.IStreamListener;
import org.red5.server.api.stream.IStreamPacket;
+import org.red5.server.messaging.IMessage;
+import org.red5.server.messaging.IMessageComponent;
+import org.red5.server.messaging.IPipe;
+import org.red5.server.messaging.OOBControlMessage;
+import org.red5.server.messaging.PipeConnectionEvent;
+import org.red5.server.net.rtmp.ClientExceptionHandler;
+import org.red5.server.net.rtmp.RTMPClient;
import org.red5.server.net.rtmp.event.IRTMPEvent;
+import org.red5.server.net.rtmp.event.Notify;
+import org.red5.server.net.rtmp.status.StatusCodes;
import org.red5.server.stream.IBroadcastScope;
import org.red5.server.stream.StreamingProxy;
import org.red5.server.stream.message.RTMPMessage;
import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
public class ScopeApplicationAdapter extends ApplicationAdapter implements
@@ -121,6 +132,109 @@ public class ScopeApplicationAdapter ext
private static long broadCastCounter = 0;
public static boolean initComplete = false;
+ private static class MyStreamingProxy extends StreamingProxy {
+ private static Logger log =
LoggerFactory.getLogger(StreamingProxy.class);
+ private List<IMessage> frameBuffer = new ArrayList<IMessage>();
+
+ private static final int STOPPED = 0;
+ private static final int CONNECTING = 1;
+ private static final int STREAM_CREATING = 2;
+ private static final int PUBLISHING = 3;
+ private static final int PUBLISHED = 4;
+ private String host;
+ private int port;
+ private String app;
+ private RTMPClient rtmpClient;
+ private int state;
+ private String publishName;
+ private int streamId;
+ private String publishMode;
+
+ public void init(ClientExceptionHandler exceptionHandler) {
+ rtmpClient = new RTMPClient();
+ rtmpClient.setExceptionHandler(exceptionHandler);
+ state = STOPPED;
+ }
+
+ public synchronized void start(String publishName, String publishMode,
Object[] params) {
+ state = CONNECTING;
+ this.publishName = publishName;
+ this.publishMode = publishMode;
+
+ Map<String, Object> defParams =
rtmpClient.makeDefaultConnectionParams(host, port, app);
+ rtmpClient.connect(host, port, defParams, this, params);
+ }
+
+ public synchronized void stop() {
+ if (state >= STREAM_CREATING) {
+ rtmpClient.disconnect();
+ }
+ state = STOPPED;
+ }
+
+ public void onPipeConnectionEvent(PipeConnectionEvent event) {
+ // nothing to do
+ }
+
+ public synchronized void pushMessage(IPipe pipe, IMessage message)
throws IOException {
+ if (state >= PUBLISHED && message instanceof RTMPMessage) {
+ RTMPMessage rtmpMsg = (RTMPMessage) message;
+ rtmpClient.publishStreamData(streamId, rtmpMsg);
+ } else {
+ frameBuffer.add(message);
+ }
+ }
+
+ public void onOOBControlMessage(IMessageComponent source, IPipe pipe,
OOBControlMessage oobCtrlMsg) {
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public void setPort(int port) {
+ this.port = port;
+ }
+
+ public void setApp(String app) {
+ this.app = app;
+ }
+
+ public synchronized void onStreamEvent(Notify notify) {
+ log.debug("onStreamEvent: {}", notify);
+ ObjectMap<?, ?> map = (ObjectMap<?, ?>)
notify.getCall().getArguments()[0];
+ String code = (String) map.get("code");
+ log.debug("<:{}", code);
+ if (StatusCodes.NS_PUBLISH_START.equals(code)) {
+ state = PUBLISHED;
+ rtmpClient.invoke("FCPublish", new Object[] {
publishName }, this);
+ while (frameBuffer.size() > 0) {
+ rtmpClient.publishStreamData(streamId,
frameBuffer.remove(0));
+ }
+ }
+ }
+
+ public synchronized void resultReceived(IPendingServiceCall call) {
+ log.debug("resultReceived:> {}", call.getServiceMethodName());
+ if ("connect".equals(call.getServiceMethodName())) {
+ state = STREAM_CREATING;
+ rtmpClient.createStream(this);
+ } else if ("createStream".equals(call.getServiceMethodName())) {
+ state = PUBLISHING;
+ Object result = call.getResult();
+ if (result instanceof Integer) {
+ Integer streamIdInt = (Integer) result;
+ streamId = streamIdInt.intValue();
+ log.debug("Publishing: {}", state);
+ rtmpClient.publish(streamIdInt.intValue(),
publishName, publishMode, this);
+ } else {
+ rtmpClient.disconnect();
+ state = STOPPED;
+ }
+ }
+ }
+ }
+
public synchronized void resultReceived(IPendingServiceCall arg0) {
// TODO Auto-generated method stub
}
@@ -412,10 +526,12 @@ public class ScopeApplicationAdapter ext
flvRecorderService.recordMeetingStream(recordingName, "", false);
} else if
(Boolean.valueOf(map.get("startPublishing").toString())) {
- returnMap.put("modus",
"startPublishing");
- streamPublishingStart("" +
map.get("publishingHost")
+ if (streamPublishingStart("" +
map.get("publishingHost")
, "" + map.get("publishingApp")
- , "" + map.get("publishingId"));
+ , "" + map.get("publishingId")))
+ {
+ returnMap.put("modus",
"startPublishing");
+ }
}
return returnMap;
@@ -807,20 +923,32 @@ public class ScopeApplicationAdapter ext
}
}
- public void streamPublishingStart(String host, String app, String id) {
- IConnection current = Red5.getConnectionLocal();
- RoomClient rc =
clientListManager.getClientByStreamId(current.getClient().getId());
+ public boolean streamPublishingStart(String host, String app, String id) {
+ final boolean[] result = {true};
+ final IConnection conn = Red5.getConnectionLocal();
+ RoomClient rc =
clientListManager.getClientByStreamId(conn.getClient().getId());
String publishName = rc.getStreamPublishName();
if (rc.getIsScreenClient() && rc.isStartStreaming()) {
- IScope scope = current.getScope();
+ IScope scope = conn.getScope();
IBroadcastStream stream = getBroadcastStream(scope,
publishName);
IBroadcastScope bsScope = getBroadcastScope(scope, publishName);
- final StreamingProxy proxy = new StreamingProxy();
+ final MyStreamingProxy proxy = new MyStreamingProxy();
proxy.setHost(host);
proxy.setApp(app);
proxy.setPort(1935);
- proxy.init();
+ proxy.init(new ClientExceptionHandler() {
+ public void handleException(Throwable
throwable) {
+ result[0] = false;
+ HashMap<String, Object> params = new
HashMap<String, Object>();
+ params.put("stopPublishing", true);
+ params.put("error",
throwable.getMessage());
+
((IServiceCapableConnection)conn).invoke(
+ "screenSharerAction"
+ , new Object[] { params }
+ , ScopeApplicationAdapter.this);
+ }
+ });
bsScope.subscribe(proxy, null);
proxy.start(id, StreamingProxy.LIVE, null);
streamingProxyMap.put(publishName, proxy);
@@ -835,6 +963,7 @@ public class ScopeApplicationAdapter ext
}
});
}
+ return result[0];
}
public void streamPublishingStop() {
Modified:
incubator/openmeetings/trunk/singlewebapp/src/org/openmeetings/screen/webstart/CoreScreenShare.java
URL:
http://svn.apache.org/viewvc/incubator/openmeetings/trunk/singlewebapp/src/org/openmeetings/screen/webstart/CoreScreenShare.java?rev=1338566&r1=1338565&r2=1338566&view=diff
==============================================================================
---
incubator/openmeetings/trunk/singlewebapp/src/org/openmeetings/screen/webstart/CoreScreenShare.java
(original)
+++
incubator/openmeetings/trunk/singlewebapp/src/org/openmeetings/screen/webstart/CoreScreenShare.java
Tue May 15 07:05:21 2012
@@ -286,9 +286,22 @@ public class CoreScreenShare {
return;
}
- if (invoke.getCall().getServiceMethodName()
- .equals("sendRemoteCursorEvent")) {
+ String method = invoke.getCall().getServiceMethodName();
+ if ("sendRemoteCursorEvent".equals(method)) {
sendRemoteCursorEvent(invoke.getCall().getArguments()[0]);
+ } else if ("screenSharerAction".equals(method)) {
+ Object[] args = invoke.getCall().getArguments();
+ if (args != null) {
+ @SuppressWarnings("unchecked")
+ HashMap<String, Object> params =
(HashMap<String, Object>)args[0];
+ if (params.containsKey("stopPublishing")
+ && Boolean.parseBoolean("" +
params.get("stopPublishing"))) {
+ frame.setPublishingStatus(false);
+ }
+ if (params.containsKey("error")) {
+ frame.setStatus("" +
params.get("error"));
+ }
+ }
}
}
Modified:
incubator/openmeetings/trunk/singlewebapp/src/org/openmeetings/screen/webstart/ScreenV1Encoder.java
URL:
http://svn.apache.org/viewvc/incubator/openmeetings/trunk/singlewebapp/src/org/openmeetings/screen/webstart/ScreenV1Encoder.java?rev=1338566&r1=1338565&r2=1338566&view=diff
==============================================================================
---
incubator/openmeetings/trunk/singlewebapp/src/org/openmeetings/screen/webstart/ScreenV1Encoder.java
(original)
+++
incubator/openmeetings/trunk/singlewebapp/src/org/openmeetings/screen/webstart/ScreenV1Encoder.java
Tue May 15 07:05:21 2012
@@ -106,7 +106,7 @@ public class ScreenV1Encoder extends Bas
for (int y = area.y + area.height - 1; y >= area.y; --y) {
for (int x = area.x; x < area.x + area.width; ++x) {
int pixel = img.getRGB(x, y);
- if (!changed && pixel != last.getRGB(x, y)) {
+ if (!changed && (last == null || pixel !=
last.getRGB(x, y))) {
changed = true;
}
areaBuf[count++] = (byte)(pixel & 0xFF);
// Blue component