This is an automated email from the ASF dual-hosted git repository. solomax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/openmeetings.git
The following commit(s) were added to refs/heads/master by this push: new c94c7e0 [OPENMEETINGS-1955] some work on recordings and reconnect c94c7e0 is described below commit c94c7e076a715e8efd24afcb05259a303bee9366 Author: Maxim Solodovnik <solomax...@gmail.com> AuthorDate: Fri Jan 25 22:02:40 2019 +0700 [OPENMEETINGS-1955] some work on recordings and reconnect --- openmeetings-core/pom.xml | 2 +- .../org/apache/openmeetings/core/remote/KRoom.java | 11 +++- .../openmeetings/core/remote/KurentoHandler.java | 75 ++++++++++++++-------- .../openmeetings/core/util/WebSocketHelper.java | 30 +++++++-- .../openmeetings/db/util/ws/RoomMessage.java | 9 +++ .../openmeetings/db/util/ws/TextRoomMessage.java | 10 +++ .../apache/openmeetings/web/room/RoomPanel.java | 3 + 7 files changed, 103 insertions(+), 37 deletions(-) diff --git a/openmeetings-core/pom.xml b/openmeetings-core/pom.xml index 982a7ec..fed91cb 100644 --- a/openmeetings-core/pom.xml +++ b/openmeetings-core/pom.xml @@ -98,7 +98,7 @@ <dependency> <groupId>org.kurento</groupId> <artifactId>kurento-client</artifactId> - <version>6.8.0</version> + <version>6.9.0</version> </dependency> <!-- Test dependencies --> <dependency> diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java index a9e215c..7d1e1e1 100644 --- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java +++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java @@ -140,18 +140,23 @@ public class KRoom { rec.setOwnerId(ownerId); rec.setStatus(Recording.Status.RECORDING); - rec = recDao.update(rec); - // Receive recordingId - recordingId = rec.getId(); log.debug("##REC:: recording created by USER: {}", ownerId); for (final KStream stream : streams.values()) { StreamDesc sd = c.getStream(stream.getUid()); + if (sd == null) { + continue; + } if (StreamType.SCREEN == sd.getType()) { sd.addActivity(Activity.RECORD); + rec.setWidth(sd.getWidth()); + rec.setHeight(sd.getWidth()); } stream.startRecord(); } + rec = recDao.update(rec); + // Receive recordingId + recordingId = rec.getId(); // Send notification to all users that the recording has been started WebSocketHelper.sendRoom(new RoomMessage(roomId, u, RoomMessage.Type.recordingToggled)); diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java index 81cd1df..c376ee5 100644 --- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java +++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java @@ -53,6 +53,7 @@ import org.apache.openmeetings.db.entity.record.Recording; import org.apache.openmeetings.db.entity.room.Room; import org.apache.openmeetings.db.entity.room.Room.Right; import org.apache.openmeetings.db.entity.room.Room.RoomElement; +import org.apache.openmeetings.db.entity.user.User; import org.apache.openmeetings.db.manager.IClientManager; import org.apache.openmeetings.db.util.ws.RoomMessage; import org.apache.openmeetings.db.util.ws.TextRoomMessage; @@ -99,11 +100,12 @@ public class KurentoHandler { private String turnMode; private int turnTtl = 60; //minutes private KurentoClient client; + private boolean connected = false; private String kuid; private final Map<Long, KRoom> rooms = new ConcurrentHashMap<>(); final Map<String, KStream> streamsByUid = new ConcurrentHashMap<>(); final Map<String, KTestStream> testsByUid = new ConcurrentHashMap<>(); - private Runnable check = null; + private Runnable check; @Autowired private IClientManager cm; @@ -118,11 +120,21 @@ public class KurentoHandler { @Autowired private InterviewConverter interviewConverter; + private boolean isConnected() { + boolean connctd = client != null && !client.isClosed() && connected; + if (!connctd) { + log.warn(WARN_NO_KURENTO); + } + return connctd; + } + public void init() { + log.warn("!!!!!!!!!!!!!!!! Init"); check = () -> { + log.warn("!!!!!!!!!!!!!!!! CHECK !!!!!!!!!!!!!!!!!!!!"); try { - client = KurentoClient.create(kurentoWsUrl, new KConnectionListener()); kuid = randomUUID().toString(); + client = KurentoClient.create(kurentoWsUrl, new KConnectionListener(kuid)); client.getServerManager().addObjectCreatedListener(new KWatchDog()); } catch (Exception e) { log.warn("Fail to create Kurento client, will re-try in {} ms", checkTimeout); @@ -134,6 +146,8 @@ public class KurentoHandler { public void destroy() { if (client != null) { + kuid = randomUUID().toString(); // will be changed to prevent double events + client.destroy(); for (Entry<Long, KRoom> e : rooms.entrySet()) { e.getValue().close(this); } @@ -143,7 +157,7 @@ public class KurentoHandler { } testsByUid.clear(); streamsByUid.clear(); - client.destroy(); + client = null; } } @@ -285,7 +299,7 @@ public class KurentoHandler { } public void onMessage(IWsClient _c, JSONObject msg) { - if (client == null) { + if (!isConnected()) { sendError(_c, "Multimedia server is inaccessible"); return; } @@ -308,8 +322,7 @@ public class KurentoHandler { } private void checkStreams(Long roomId) { - if (client == null) { - log.warn(WARN_NO_KURENTO); + if (!isConnected()) { return; } KRoom room = getRoom(roomId); @@ -400,8 +413,7 @@ public class KurentoHandler { } public boolean recordingAllowed(Client c) { - if (client == null) { - log.warn(WARN_NO_KURENTO); + if (!isConnected()) { return false; } Room r = c.getRoom(); @@ -410,16 +422,14 @@ public class KurentoHandler { } public void startRecording(Client c) { - if (client == null) { - log.warn(WARN_NO_KURENTO); + if (!isConnected()) { return; } getRoom(c.getRoomId()).startRecording(c, recDao); } public void stopRecording(Client c) { - if (client == null) { - log.warn(WARN_NO_KURENTO); + if (!isConnected()) { return; } getRoom(c.getRoomId()).stopRecording(this, c, recDao); @@ -431,24 +441,21 @@ public class KurentoHandler { } public boolean isRecording(Long roomId) { - if (client == null) { - log.warn(WARN_NO_KURENTO); + if (!isConnected()) { return false; } return getRoom(roomId).isRecording(); } public JSONObject getRecordingUser(Long roomId) { - if (client == null) { - log.warn(WARN_NO_KURENTO); + if (!isConnected()) { return new JSONObject(); } return getRoom(roomId).getRecordingUser(); } public boolean screenShareAllowed(Client c) { - if (client == null) { - log.warn(WARN_NO_KURENTO); + if (!isConnected()) { return false; } Room r = c.getRoom(); @@ -459,7 +466,7 @@ public class KurentoHandler { } private void startSharing(Client c, JSONObject msg, Activity...activities) { - if (client != null && c.getRoomId() != null) { + if (isConnected() && c.getRoomId() != null) { getRoom(c.getRoomId()).startSharing(this, cm, c, msg, activities); } } @@ -480,8 +487,7 @@ public class KurentoHandler { } public boolean isSharing(Long roomId) { - if (client == null) { - log.warn(WARN_NO_KURENTO); + if (!isConnected()) { return false; } return getRoom(roomId).isSharing(); @@ -511,7 +517,7 @@ public class KurentoHandler { } public void remove(IWsClient _c) { - if (client == null ||_c == null) { + if (!isConnected() ||_c == null) { return; } final String uid = _c.getUid(); @@ -658,25 +664,42 @@ public class KurentoHandler { } private class KConnectionListener implements KurentoConnectionListener { + final String lkuid; + + private KConnectionListener(final String lkuid) { + this.lkuid = lkuid; + } + + private void notifyRooms() { + WebSocketHelper.sendServer(new TextRoomMessage(null, new User(), RoomMessage.Type.kurentoStatus, new JSONObject().put("connected", isConnected()).toString())); + } + @Override public void reconnected(boolean sameServer) { - log.info("Kurento reconnected ? {}", sameServer); + log.error("Kurento reconnected ? {}, this shouldn't happen", sameServer); } @Override public void disconnected() { - log.warn("Disconnected, will re-try in {} ms", checkTimeout); - recheckScheduler.schedule(check, checkTimeout, MILLISECONDS); + if (lkuid.equals(kuid)) { + log.warn("Disconnected, will re-try in {} ms", checkTimeout); + connected = false; + notifyRooms(); + destroy(); + recheckScheduler.schedule(check, checkTimeout, MILLISECONDS); + } } @Override public void connectionFailed() { - log.info("Kurento connectionFailed"); + // this handled seems to be called multiple times } @Override public void connected() { log.info("Kurento connected"); + connected = true; + notifyRooms(); } } diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/WebSocketHelper.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/WebSocketHelper.java index 0107017..d0a9519 100644 --- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/WebSocketHelper.java +++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/WebSocketHelper.java @@ -182,6 +182,17 @@ public class WebSocketHelper { sendRoom(m.getRoomId(), (t, c) -> t.sendMessage(m), null); } + public static void sendServer(final RoomMessage m) { + log.debug("Sending WebSocket message: {}", m); + sendAll(c -> { + try { + c.sendMessage(m); + } catch (Exception e) { + log.error("Error while sending message to Server", e); + } + }); + } + public static void sendRoom(final Long roomId, final JSONObject m) { sendRoom(roomId, m, true); } @@ -242,19 +253,24 @@ public class WebSocketHelper { if (publish) { publish(new WsMessageAll(m)); } + log.debug("Sending WebSocket message: {}", m); + sendAll(c -> { + try { + c.sendMessage(m); + } catch (Exception e) { + log.error("Error while sending message to ALL", e); + } + }); + } + + private static void sendAll(Consumer<IWebSocketConnection> sender) { new Thread(() -> { Application app = (Application)getApp(); WebSocketSettings settings = WebSocketSettings.Holder.get(app); IWebSocketConnectionRegistry reg = settings.getConnectionRegistry(); Executor executor = settings.getWebSocketPushMessageExecutor(); for (IWebSocketConnection c : reg.getConnections(app)) { - executor.run(() -> { - try { - c.sendMessage(m); - } catch (Exception e) { - log.error("Error while sending message to ALL", e); - } - }); + executor.run(() -> sender.accept(c)); } }).start(); } diff --git a/openmeetings-db/src/main/java/org/apache/openmeetings/db/util/ws/RoomMessage.java b/openmeetings-db/src/main/java/org/apache/openmeetings/db/util/ws/RoomMessage.java index 4436979..1762dd8 100644 --- a/openmeetings-db/src/main/java/org/apache/openmeetings/db/util/ws/RoomMessage.java +++ b/openmeetings-db/src/main/java/org/apache/openmeetings/db/util/ws/RoomMessage.java @@ -53,6 +53,7 @@ public class RoomMessage implements IWebSocketPushMessage { , mute , exclusive , quickPollUpdated + , kurentoStatus } private final Date timestamp; private final String uid; @@ -105,4 +106,12 @@ public class RoomMessage implements IWebSocketPushMessage { public String getUid() { return uid; } + + @Override + public String toString() { + return new StringBuilder().append("RoomMessage [roomId=").append(roomId) + .append(", userId=").append(userId) + .append(", type=").append(type).append("]") + .toString(); + } } diff --git a/openmeetings-db/src/main/java/org/apache/openmeetings/db/util/ws/TextRoomMessage.java b/openmeetings-db/src/main/java/org/apache/openmeetings/db/util/ws/TextRoomMessage.java index f2dbd77..06d7a16 100644 --- a/openmeetings-db/src/main/java/org/apache/openmeetings/db/util/ws/TextRoomMessage.java +++ b/openmeetings-db/src/main/java/org/apache/openmeetings/db/util/ws/TextRoomMessage.java @@ -38,4 +38,14 @@ public class TextRoomMessage extends RoomMessage { public String getText() { return text; } + + @Override + public String toString() { + return new StringBuilder() + .append("TextRoomMessage [text=").append(text) + .append(", getRoomId()=").append(getRoomId()) + .append(", getUserId()=").append(getUserId()) + .append(", getType()=").append(getType()).append("]") + .toString(); + } } diff --git a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/RoomPanel.java b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/RoomPanel.java index 4e287fc..16b30af 100644 --- a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/RoomPanel.java +++ b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/RoomPanel.java @@ -533,6 +533,9 @@ public class RoomPanel extends BasePanel { handler.appendJavaScript(getQuickPollJs()); } break; + case kurentoStatus: + menu.update(handler); + break; } } }