This is an automated email from the ASF dual-hosted git repository.

solomax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openmeetings.git


The following commit(s) were added to refs/heads/master by this push:
     new c94c7e0  [OPENMEETINGS-1955] some work on recordings and reconnect
c94c7e0 is described below

commit c94c7e076a715e8efd24afcb05259a303bee9366
Author: Maxim Solodovnik <solomax...@gmail.com>
AuthorDate: Fri Jan 25 22:02:40 2019 +0700

    [OPENMEETINGS-1955] some work on recordings and reconnect
---
 openmeetings-core/pom.xml                          |  2 +-
 .../org/apache/openmeetings/core/remote/KRoom.java | 11 +++-
 .../openmeetings/core/remote/KurentoHandler.java   | 75 ++++++++++++++--------
 .../openmeetings/core/util/WebSocketHelper.java    | 30 +++++++--
 .../openmeetings/db/util/ws/RoomMessage.java       |  9 +++
 .../openmeetings/db/util/ws/TextRoomMessage.java   | 10 +++
 .../apache/openmeetings/web/room/RoomPanel.java    |  3 +
 7 files changed, 103 insertions(+), 37 deletions(-)

diff --git a/openmeetings-core/pom.xml b/openmeetings-core/pom.xml
index 982a7ec..fed91cb 100644
--- a/openmeetings-core/pom.xml
+++ b/openmeetings-core/pom.xml
@@ -98,7 +98,7 @@
                <dependency>
                        <groupId>org.kurento</groupId>
                        <artifactId>kurento-client</artifactId>
-                       <version>6.8.0</version>
+                       <version>6.9.0</version>
                </dependency>
                <!-- Test dependencies -->
                <dependency>
diff --git 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java
 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java
index a9e215c..7d1e1e1 100644
--- 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java
+++ 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java
@@ -140,18 +140,23 @@ public class KRoom {
 
                        rec.setOwnerId(ownerId);
                        rec.setStatus(Recording.Status.RECORDING);
-                       rec = recDao.update(rec);
-                       // Receive recordingId
-                       recordingId = rec.getId();
                        log.debug("##REC:: recording created by USER: {}", 
ownerId);
 
                        for (final KStream stream : streams.values()) {
                                StreamDesc sd = c.getStream(stream.getUid());
+                               if (sd == null) {
+                                       continue;
+                               }
                                if (StreamType.SCREEN == sd.getType()) {
                                        sd.addActivity(Activity.RECORD);
+                                       rec.setWidth(sd.getWidth());
+                                       rec.setHeight(sd.getWidth());
                                }
                                stream.startRecord();
                        }
+                       rec = recDao.update(rec);
+                       // Receive recordingId
+                       recordingId = rec.getId();
 
                        // Send notification to all users that the recording 
has been started
                        WebSocketHelper.sendRoom(new RoomMessage(roomId, u, 
RoomMessage.Type.recordingToggled));
diff --git 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
index 81cd1df..c376ee5 100644
--- 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
+++ 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
@@ -53,6 +53,7 @@ import org.apache.openmeetings.db.entity.record.Recording;
 import org.apache.openmeetings.db.entity.room.Room;
 import org.apache.openmeetings.db.entity.room.Room.Right;
 import org.apache.openmeetings.db.entity.room.Room.RoomElement;
+import org.apache.openmeetings.db.entity.user.User;
 import org.apache.openmeetings.db.manager.IClientManager;
 import org.apache.openmeetings.db.util.ws.RoomMessage;
 import org.apache.openmeetings.db.util.ws.TextRoomMessage;
@@ -99,11 +100,12 @@ public class KurentoHandler {
        private String turnMode;
        private int turnTtl = 60; //minutes
        private KurentoClient client;
+       private boolean connected = false;
        private String kuid;
        private final Map<Long, KRoom> rooms = new ConcurrentHashMap<>();
        final Map<String, KStream> streamsByUid = new ConcurrentHashMap<>();
        final Map<String, KTestStream> testsByUid = new ConcurrentHashMap<>();
-       private Runnable check = null;
+       private Runnable check;
 
        @Autowired
        private IClientManager cm;
@@ -118,11 +120,21 @@ public class KurentoHandler {
        @Autowired
        private InterviewConverter interviewConverter;
 
+       private boolean isConnected() {
+               boolean connctd = client != null && !client.isClosed() && 
connected;
+               if (!connctd) {
+                       log.warn(WARN_NO_KURENTO);
+               }
+               return connctd;
+       }
+
        public void init() {
+               log.warn("!!!!!!!!!!!!!!!! Init");
                check = () -> {
+                       log.warn("!!!!!!!!!!!!!!!! CHECK !!!!!!!!!!!!!!!!!!!!");
                        try {
-                               client = KurentoClient.create(kurentoWsUrl, new 
KConnectionListener());
                                kuid = randomUUID().toString();
+                               client = KurentoClient.create(kurentoWsUrl, new 
KConnectionListener(kuid));
                                
client.getServerManager().addObjectCreatedListener(new KWatchDog());
                        } catch (Exception e) {
                                log.warn("Fail to create Kurento client, will 
re-try in {} ms", checkTimeout);
@@ -134,6 +146,8 @@ public class KurentoHandler {
 
        public void destroy() {
                if (client != null) {
+                       kuid = randomUUID().toString(); // will be changed to 
prevent double events
+                       client.destroy();
                        for (Entry<Long, KRoom> e : rooms.entrySet()) {
                                e.getValue().close(this);
                        }
@@ -143,7 +157,7 @@ public class KurentoHandler {
                        }
                        testsByUid.clear();
                        streamsByUid.clear();
-                       client.destroy();
+                       client = null;
                }
        }
 
@@ -285,7 +299,7 @@ public class KurentoHandler {
        }
 
        public void onMessage(IWsClient _c, JSONObject msg) {
-               if (client == null) {
+               if (!isConnected()) {
                        sendError(_c, "Multimedia server is inaccessible");
                        return;
                }
@@ -308,8 +322,7 @@ public class KurentoHandler {
        }
 
        private void checkStreams(Long roomId) {
-               if (client == null) {
-                       log.warn(WARN_NO_KURENTO);
+               if (!isConnected()) {
                        return;
                }
                KRoom room = getRoom(roomId);
@@ -400,8 +413,7 @@ public class KurentoHandler {
        }
 
        public boolean recordingAllowed(Client c) {
-               if (client == null) {
-                       log.warn(WARN_NO_KURENTO);
+               if (!isConnected()) {
                        return false;
                }
                Room r = c.getRoom();
@@ -410,16 +422,14 @@ public class KurentoHandler {
        }
 
        public void startRecording(Client c) {
-               if (client == null) {
-                       log.warn(WARN_NO_KURENTO);
+               if (!isConnected()) {
                        return;
                }
                getRoom(c.getRoomId()).startRecording(c, recDao);
        }
 
        public void stopRecording(Client c) {
-               if (client == null) {
-                       log.warn(WARN_NO_KURENTO);
+               if (!isConnected()) {
                        return;
                }
                getRoom(c.getRoomId()).stopRecording(this, c, recDao);
@@ -431,24 +441,21 @@ public class KurentoHandler {
        }
 
        public boolean isRecording(Long roomId) {
-               if (client == null) {
-                       log.warn(WARN_NO_KURENTO);
+               if (!isConnected()) {
                        return false;
                }
                return getRoom(roomId).isRecording();
        }
 
        public JSONObject getRecordingUser(Long roomId) {
-               if (client == null) {
-                       log.warn(WARN_NO_KURENTO);
+               if (!isConnected()) {
                        return new JSONObject();
                }
                return getRoom(roomId).getRecordingUser();
        }
 
        public boolean screenShareAllowed(Client c) {
-               if (client == null) {
-                       log.warn(WARN_NO_KURENTO);
+               if (!isConnected()) {
                        return false;
                }
                Room r = c.getRoom();
@@ -459,7 +466,7 @@ public class KurentoHandler {
        }
 
        private void startSharing(Client c, JSONObject msg, 
Activity...activities) {
-               if (client != null && c.getRoomId() != null) {
+               if (isConnected() && c.getRoomId() != null) {
                        getRoom(c.getRoomId()).startSharing(this, cm, c, msg, 
activities);
                }
        }
@@ -480,8 +487,7 @@ public class KurentoHandler {
        }
 
        public boolean isSharing(Long roomId) {
-               if (client == null) {
-                       log.warn(WARN_NO_KURENTO);
+               if (!isConnected()) {
                        return false;
                }
                return getRoom(roomId).isSharing();
@@ -511,7 +517,7 @@ public class KurentoHandler {
        }
 
        public void remove(IWsClient _c) {
-               if (client == null ||_c == null) {
+               if (!isConnected() ||_c == null) {
                        return;
                }
                final String uid = _c.getUid();
@@ -658,25 +664,42 @@ public class KurentoHandler {
        }
 
        private class KConnectionListener implements KurentoConnectionListener {
+               final String lkuid;
+
+               private KConnectionListener(final String lkuid) {
+                       this.lkuid = lkuid;
+               }
+
+               private void notifyRooms() {
+                       WebSocketHelper.sendServer(new TextRoomMessage(null, 
new User(), RoomMessage.Type.kurentoStatus, new JSONObject().put("connected", 
isConnected()).toString()));
+               }
+
                @Override
                public void reconnected(boolean sameServer) {
-                       log.info("Kurento reconnected ? {}", sameServer);
+                       log.error("Kurento reconnected ? {}, this shouldn't 
happen", sameServer);
                }
 
                @Override
                public void disconnected() {
-                       log.warn("Disconnected, will re-try in {} ms", 
checkTimeout);
-                       recheckScheduler.schedule(check, checkTimeout, 
MILLISECONDS);
+                       if (lkuid.equals(kuid)) {
+                               log.warn("Disconnected, will re-try in {} ms", 
checkTimeout);
+                               connected = false;
+                               notifyRooms();
+                               destroy();
+                               recheckScheduler.schedule(check, checkTimeout, 
MILLISECONDS);
+                       }
                }
 
                @Override
                public void connectionFailed() {
-                       log.info("Kurento connectionFailed");
+                       // this handled seems to be called multiple times
                }
 
                @Override
                public void connected() {
                        log.info("Kurento connected");
+                       connected = true;
+                       notifyRooms();
                }
        }
 
diff --git 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/WebSocketHelper.java
 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/WebSocketHelper.java
index 0107017..d0a9519 100644
--- 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/WebSocketHelper.java
+++ 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/util/WebSocketHelper.java
@@ -182,6 +182,17 @@ public class WebSocketHelper {
                sendRoom(m.getRoomId(), (t, c) -> t.sendMessage(m), null);
        }
 
+       public static void sendServer(final RoomMessage m) {
+               log.debug("Sending WebSocket message: {}", m);
+               sendAll(c -> {
+                       try {
+                               c.sendMessage(m);
+                       } catch (Exception e) {
+                               log.error("Error while sending message to 
Server", e);
+                       }
+               });
+       }
+
        public static void sendRoom(final Long roomId, final JSONObject m) {
                sendRoom(roomId, m, true);
        }
@@ -242,19 +253,24 @@ public class WebSocketHelper {
                if (publish) {
                        publish(new WsMessageAll(m));
                }
+               log.debug("Sending WebSocket message: {}", m);
+               sendAll(c -> {
+                       try {
+                               c.sendMessage(m);
+                       } catch (Exception e) {
+                               log.error("Error while sending message to ALL", 
e);
+                       }
+               });
+       }
+
+       private static void sendAll(Consumer<IWebSocketConnection> sender) {
                new Thread(() -> {
                        Application app = (Application)getApp();
                        WebSocketSettings settings = 
WebSocketSettings.Holder.get(app);
                        IWebSocketConnectionRegistry reg = 
settings.getConnectionRegistry();
                        Executor executor = 
settings.getWebSocketPushMessageExecutor();
                        for (IWebSocketConnection c : reg.getConnections(app)) {
-                               executor.run(() -> {
-                                       try {
-                                               c.sendMessage(m);
-                                       } catch (Exception e) {
-                                               log.error("Error while sending 
message to ALL", e);
-                                       }
-                               });
+                               executor.run(() -> sender.accept(c));
                        }
                }).start();
        }
diff --git 
a/openmeetings-db/src/main/java/org/apache/openmeetings/db/util/ws/RoomMessage.java
 
b/openmeetings-db/src/main/java/org/apache/openmeetings/db/util/ws/RoomMessage.java
index 4436979..1762dd8 100644
--- 
a/openmeetings-db/src/main/java/org/apache/openmeetings/db/util/ws/RoomMessage.java
+++ 
b/openmeetings-db/src/main/java/org/apache/openmeetings/db/util/ws/RoomMessage.java
@@ -53,6 +53,7 @@ public class RoomMessage implements IWebSocketPushMessage {
                , mute
                , exclusive
                , quickPollUpdated
+               , kurentoStatus
        }
        private final Date timestamp;
        private final String uid;
@@ -105,4 +106,12 @@ public class RoomMessage implements IWebSocketPushMessage {
        public String getUid() {
                return uid;
        }
+
+       @Override
+       public String toString() {
+               return new StringBuilder().append("RoomMessage 
[roomId=").append(roomId)
+                               .append(", userId=").append(userId)
+                               .append(", type=").append(type).append("]")
+                               .toString();
+       }
 }
diff --git 
a/openmeetings-db/src/main/java/org/apache/openmeetings/db/util/ws/TextRoomMessage.java
 
b/openmeetings-db/src/main/java/org/apache/openmeetings/db/util/ws/TextRoomMessage.java
index f2dbd77..06d7a16 100644
--- 
a/openmeetings-db/src/main/java/org/apache/openmeetings/db/util/ws/TextRoomMessage.java
+++ 
b/openmeetings-db/src/main/java/org/apache/openmeetings/db/util/ws/TextRoomMessage.java
@@ -38,4 +38,14 @@ public class TextRoomMessage extends RoomMessage {
        public String getText() {
                return text;
        }
+
+       @Override
+       public String toString() {
+               return new StringBuilder()
+                               .append("TextRoomMessage [text=").append(text)
+                               .append(", getRoomId()=").append(getRoomId())
+                               .append(", getUserId()=").append(getUserId())
+                               .append(", 
getType()=").append(getType()).append("]")
+                               .toString();
+       }
 }
diff --git 
a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/RoomPanel.java
 
b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/RoomPanel.java
index 4e287fc..16b30af 100644
--- 
a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/RoomPanel.java
+++ 
b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/RoomPanel.java
@@ -533,6 +533,9 @@ public class RoomPanel extends BasePanel {
                                                
handler.appendJavaScript(getQuickPollJs());
                                        }
                                                break;
+                                       case kurentoStatus:
+                                               menu.update(handler);
+                                               break;
                                }
                        }
                }

Reply via email to