This is an automated email from the ASF dual-hosted git repository. solomax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/openmeetings.git
The following commit(s) were added to refs/heads/master by this push: new 8b0c13e [OPENMEETINGS-2301] code is improved a bit (original issue still reproducible) 8b0c13e is described below commit 8b0c13e05f0d6da1f561b54ce2fe8b4b2c7192de Author: Maxim Solodovnik <solomax...@gmail.com> AuthorDate: Mon May 4 21:08:23 2020 +0700 [OPENMEETINGS-2301] code is improved a bit (original issue still reproducible) --- .../openmeetings/core/remote/KurentoHandler.java | 37 +++++++++++----------- .../openmeetings/core/remote/StreamProcessor.java | 12 +++++-- 2 files changed, 28 insertions(+), 21 deletions(-) diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java index 348f3e3..0276767 100644 --- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java +++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java @@ -122,16 +122,19 @@ public class KurentoHandler { public void init() { check = () -> { try { - if (isConnected()) { + if (client != null) { return; } + log.debug("Reconnecting KMS"); kuid = randomUUID().toString(); - client = KurentoClient.create(kurentoWsUrl, new KConnectionListener(kuid)); + client = KurentoClient.create(kurentoWsUrl, new KConnectionListener()); client.getServerManager().addObjectCreatedListener(new KWatchDogCreate()); client.getServerManager().addObjectDestroyedListener(event -> log.debug("Kurento::ObjectDestroyedEvent objectId {}, tags {}, source {}", event.getObjectId(), event.getTags(), event.getSource()) ); } catch (Exception e) { + connected = false; + clean(); log.warn("Fail to create Kurento client, will re-try in {} ms", checkTimeout); } }; @@ -146,15 +149,19 @@ public class KurentoHandler { private void clean() { if (client != null) { - kuid = randomUUID().toString(); // will be changed to prevent double events - client.destroy(); + KurentoClient copy = client; + client = null; + testProcessor.destroy(); + streamProcessor.destroy(); for (Entry<Long, KRoom> e : rooms.entrySet()) { e.getValue().close(); } - testProcessor.destroy(); - streamProcessor.destroy(); rooms.clear(); - client = null; + if (copy != null && !copy.isClosed()) { + log.debug("Client will destroyed ..."); + copy.destroy(); + log.debug(".... Client is destroyed"); + } } } @@ -223,7 +230,7 @@ public class KurentoHandler { } public void remove(IWsClient c) { - if (!isConnected() ||c == null) { + if (!isConnected() || c == null) { return; } if (!(c instanceof Client)) { @@ -365,12 +372,6 @@ public class KurentoHandler { } private class KConnectionListener implements KurentoConnectionListener { - final String lkuid; - - private KConnectionListener(final String lkuid) { - this.lkuid = lkuid; - } - private void notifyRooms() { WebSocketHelper.sendServer(new TextRoomMessage(null, new User(), RoomMessage.Type.KURENTO_STATUS, new JSONObject().put("connected", isConnected()).toString())); } @@ -383,11 +384,9 @@ public class KurentoHandler { @Override public void disconnected() { log.info("Kurento disconnected"); - if (lkuid.equals(kuid)) { - connected = false; - notifyRooms(); - clean(); - } + connected = false; + notifyRooms(); + clean(); } @Override diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java index 4fa49fe..729e77a 100644 --- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java +++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java @@ -174,8 +174,11 @@ public class StreamProcessor implements IStreamProcessor { } private void handleBroadcastStarted(Client c, final String uid, JSONObject msg) { + if (!kHandler.isConnected()) { + return; + } StreamDesc sd = c.getStream(uid); - KStream sender= getByUid(uid); + KStream sender = getByUid(uid); try { if (sender == null) { KRoom room = kHandler.getRoom(c.getRoomId()); @@ -230,6 +233,9 @@ public class StreamProcessor implements IStreamProcessor { public void toggleActivity(Client c, Activity a) { log.info("PARTICIPANT {}: trying to toggle activity {}", c, a); + if (!kHandler.isConnected()) { + return; + } if (activityAllowed(c, a, c.getRoom())) { boolean wasBroadcasting = isBroadcasting(c); @@ -567,6 +573,8 @@ public class StreamProcessor implements IStreamProcessor { @Override public void destroy() { - streamByUid.clear(); + for (Map.Entry<String, KStream> e : streamByUid.entrySet()) { + release(e.getValue(), true); + } } }