This is an automated email from the ASF dual-hosted git repository.

solomax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openmeetings.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b0c13e  [OPENMEETINGS-2301] code is improved a bit (original issue 
still reproducible)
8b0c13e is described below

commit 8b0c13e05f0d6da1f561b54ce2fe8b4b2c7192de
Author: Maxim Solodovnik <solomax...@gmail.com>
AuthorDate: Mon May 4 21:08:23 2020 +0700

    [OPENMEETINGS-2301] code is improved a bit (original issue still 
reproducible)
---
 .../openmeetings/core/remote/KurentoHandler.java   | 37 +++++++++++-----------
 .../openmeetings/core/remote/StreamProcessor.java  | 12 +++++--
 2 files changed, 28 insertions(+), 21 deletions(-)

diff --git 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
index 348f3e3..0276767 100644
--- 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
+++ 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
@@ -122,16 +122,19 @@ public class KurentoHandler {
        public void init() {
                check = () -> {
                        try {
-                               if (isConnected()) {
+                               if (client != null) {
                                        return;
                                }
+                               log.debug("Reconnecting KMS");
                                kuid = randomUUID().toString();
-                               client = KurentoClient.create(kurentoWsUrl, new 
KConnectionListener(kuid));
+                               client = KurentoClient.create(kurentoWsUrl, new 
KConnectionListener());
                                
client.getServerManager().addObjectCreatedListener(new KWatchDogCreate());
                                
client.getServerManager().addObjectDestroyedListener(event ->
                                        
log.debug("Kurento::ObjectDestroyedEvent objectId {}, tags {}, source {}", 
event.getObjectId(), event.getTags(), event.getSource())
                                );
                        } catch (Exception e) {
+                               connected = false;
+                               clean();
                                log.warn("Fail to create Kurento client, will 
re-try in {} ms", checkTimeout);
                        }
                };
@@ -146,15 +149,19 @@ public class KurentoHandler {
 
        private void clean() {
                if (client != null) {
-                       kuid = randomUUID().toString(); // will be changed to 
prevent double events
-                       client.destroy();
+                       KurentoClient copy = client;
+                       client = null;
+                       testProcessor.destroy();
+                       streamProcessor.destroy();
                        for (Entry<Long, KRoom> e : rooms.entrySet()) {
                                e.getValue().close();
                        }
-                       testProcessor.destroy();
-                       streamProcessor.destroy();
                        rooms.clear();
-                       client = null;
+                       if (copy != null && !copy.isClosed()) {
+                               log.debug("Client will destroyed ...");
+                               copy.destroy();
+                               log.debug(".... Client is destroyed");
+                       }
                }
        }
 
@@ -223,7 +230,7 @@ public class KurentoHandler {
        }
 
        public void remove(IWsClient c) {
-               if (!isConnected() ||c == null) {
+               if (!isConnected() || c == null) {
                        return;
                }
                if (!(c instanceof Client)) {
@@ -365,12 +372,6 @@ public class KurentoHandler {
        }
 
        private class KConnectionListener implements KurentoConnectionListener {
-               final String lkuid;
-
-               private KConnectionListener(final String lkuid) {
-                       this.lkuid = lkuid;
-               }
-
                private void notifyRooms() {
                        WebSocketHelper.sendServer(new TextRoomMessage(null, 
new User(), RoomMessage.Type.KURENTO_STATUS, new JSONObject().put("connected", 
isConnected()).toString()));
                }
@@ -383,11 +384,9 @@ public class KurentoHandler {
                @Override
                public void disconnected() {
                        log.info("Kurento disconnected");
-                       if (lkuid.equals(kuid)) {
-                               connected = false;
-                               notifyRooms();
-                               clean();
-                       }
+                       connected = false;
+                       notifyRooms();
+                       clean();
                }
 
                @Override
diff --git 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java
 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java
index 4fa49fe..729e77a 100644
--- 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java
+++ 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java
@@ -174,8 +174,11 @@ public class StreamProcessor implements IStreamProcessor {
        }
 
        private void handleBroadcastStarted(Client c, final String uid, 
JSONObject msg) {
+               if (!kHandler.isConnected()) {
+                       return;
+               }
                StreamDesc sd = c.getStream(uid);
-               KStream sender= getByUid(uid);
+               KStream sender = getByUid(uid);
                try {
                        if (sender == null) {
                                KRoom room = kHandler.getRoom(c.getRoomId());
@@ -230,6 +233,9 @@ public class StreamProcessor implements IStreamProcessor {
 
        public void toggleActivity(Client c, Activity a) {
                log.info("PARTICIPANT {}: trying to toggle activity {}", c, a);
+               if (!kHandler.isConnected()) {
+                       return;
+               }
 
                if (activityAllowed(c, a, c.getRoom())) {
                        boolean wasBroadcasting = isBroadcasting(c);
@@ -567,6 +573,8 @@ public class StreamProcessor implements IStreamProcessor {
 
        @Override
        public void destroy() {
-               streamByUid.clear();
+               for (Map.Entry<String, KStream> e : streamByUid.entrySet()) {
+                       release(e.getValue(), true);
+               }
        }
 }

Reply via email to