This is an automated email from the ASF dual-hosted git repository. solomax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/openmeetings.git
The following commit(s) were added to refs/heads/master by this push: new a73330b OPENMEETINGS-2315 Only StreamProcessor to hold reference of KStream (#71) a73330b is described below commit a73330b84ad31ec2bab0d40168446c4e5b449eaa Author: Sebastian Wagner <sebawag...@apache.org> AuthorDate: Fri May 1 13:18:16 2020 +1200 OPENMEETINGS-2315 Only StreamProcessor to hold reference of KStream (#71) * OPENMEETINGS-2315 Only StreamProcessor to hold reference of KStream * OPENMEETINGS-2315 Remove uncommented code to add stream that won't be needed anymore * OPENMEETINGS-2315 Fix stopBroadcast to release processor. And move to StreamProcessor. * OPENMEETINGS-2315 Rename KRomm.streamProcessor to KRoom.processor. * OPENMEETINGS-2315 Fix review comments on method modifiers and formatting. * OPENMEETINGS-2315 Fix circular reference. * OPENMEETINGS-2315 Fix review comments * OPENMEETINGS-2315 fix some linebreak * OPENMEETINGS-2315 fix some linebreak2 * OPENMEETINGS-2315 change list init. --- .../openmeetings/core/remote/IStreamProcessor.java | 2 +- .../org/apache/openmeetings/core/remote/KRoom.java | 62 +++++++++------------- .../apache/openmeetings/core/remote/KStream.java | 9 ++-- .../openmeetings/core/remote/KTestStream.java | 2 +- .../openmeetings/core/remote/KurentoHandler.java | 6 +-- .../openmeetings/core/remote/StreamProcessor.java | 30 +++++++---- .../core/remote/TestStreamProcessor.java | 2 +- .../web/admin/connection/ConnectionsPanel.java | 8 --- 8 files changed, 55 insertions(+), 66 deletions(-) diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/IStreamProcessor.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/IStreamProcessor.java index d6575ed..f142285 100644 --- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/IStreamProcessor.java +++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/IStreamProcessor.java @@ -19,6 +19,6 @@ package org.apache.openmeetings.core.remote; public interface IStreamProcessor { - void release(AbstractStream stream); + void release(AbstractStream stream, boolean releaseStream); void destroy(); } diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java index 47abfdd..f248a61 100644 --- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java +++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java @@ -27,9 +27,7 @@ import static org.apache.openmeetings.core.remote.KurentoHandler.newKurentoMsg; import java.util.Collection; import java.util.Date; -import java.util.Map; import java.util.Optional; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.openmeetings.core.util.WebSocketHelper; @@ -53,10 +51,18 @@ import org.slf4j.LoggerFactory; import com.github.openjson.JSONObject; +/** + * Bean object dynamically created representing a conference room on the MediaServer + * + */ public class KRoom { + private static final Logger log = LoggerFactory.getLogger(KRoom.class); - private final Map<String, KStream> streams = new ConcurrentHashMap<>(); + /** + * Not injected by annotation but by constructor. + */ + private final StreamProcessor processor; private final MediaPipeline pipeline; private final Long roomId; private final Room.Type type; @@ -67,7 +73,8 @@ public class KRoom { private JSONObject recordingUser = new JSONObject(); private JSONObject sharingUser = new JSONObject(); - public KRoom(Room r, MediaPipeline pipeline, RecordingChunkDao chunkDao) { + public KRoom(StreamProcessor processor, Room r, MediaPipeline pipeline, RecordingChunkDao chunkDao) { + this.processor = processor; this.roomId = r.getId(); this.type = r.getType(); this.pipeline = pipeline; @@ -98,17 +105,16 @@ public class KRoom { public KStream join(final StreamDesc sd) { log.info("ROOM {}: join client {}, stream: {}", roomId, sd.getClient(), sd.getUid()); final KStream stream = new KStream(sd, this); - streams.put(stream.getUid(), stream); + processor.addStream(stream); return stream; } public Collection<KStream> getParticipants() { - return streams.values(); + return processor.getByRoom(this.getRoomId()); } - public void onStopBroadcast(KStream stream, final StreamProcessor processor) { - streams.remove(stream.getUid()); - stream.release(processor); + public void onStopBroadcast(KStream stream) { + processor.release(stream, true); WebSocketHelper.sendAll(newKurentoMsg() .put("id", "broadcastStopped") .put("uid", stream.getUid()) @@ -118,21 +124,6 @@ public class KRoom { //FIXME TODO permission can be removed, some listener might be required } - public void leave(final StreamProcessor processor, final Client c) { - for (Map.Entry<String, KStream> e : streams.entrySet()) { - e.getValue().remove(c); - } - for (StreamDesc sd : c.getStreams()) { - if (StreamType.SCREEN == sd.getType()) { - - } - KStream stream = streams.remove(sd.getUid()); - if (stream != null) { - stream.release(processor); - } - } - } - public boolean isRecording() { return recordingStarted.get(); } @@ -141,7 +132,7 @@ public class KRoom { return new JSONObject(recordingUser.toString()); } - public void startRecording(StreamProcessor processor, Client c) { + public void startRecording(Client c) { if (recordingStarted.compareAndSet(false, true)) { log.debug("##REC:: recording in room {} is starting ::", roomId); Room r = c.getRoom(); @@ -180,9 +171,9 @@ public class KRoom { rec = processor.getRecordingDao().update(rec); // Receive recordingId recordingId = rec.getId(); - for (final KStream stream : streams.values()) { - stream.startRecord(processor); - } + processor.getByRoom(this.getRoomId()).forEach( + stream -> stream.startRecord(processor) + ); // Send notification to all users that the recording has been started WebSocketHelper.sendRoom(new RoomMessage(roomId, u, RoomMessage.Type.RECORDING_TOGGLED)); @@ -190,12 +181,10 @@ public class KRoom { } } - public void stopRecording(final StreamProcessor processor, Client c) { + public void stopRecording(Client c) { if (recordingStarted.compareAndSet(true, false)) { log.debug("##REC:: recording in room {} is stopping {} ::", roomId, recordingId); - for (final KStream stream : streams.values()) { - stream.stopRecord(); - } + processor.getByRoom(this.getRoomId()).forEach(KStream::stopRecord); Recording rec = processor.getRecordingDao().get(recordingId); rec.setRecordEnd(new Date()); rec = processor.getRecordingDao().update(rec); @@ -270,11 +259,10 @@ public class KRoom { } } - public void close(final StreamProcessor processor) { - for (final KStream stream : streams.values()) { - stream.release(processor); - } - streams.clear(); + public void close() { + processor.getByRoom(this.getRoomId()).forEach( + stream -> stream.release(processor) + ); pipeline.release(new Continuation<Void>() { @Override public void onSuccess(Void result) throws Exception { diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java index 397d25f..fd4fa65 100644 --- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java +++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java @@ -130,7 +130,7 @@ public class KStream extends AbstractStream { if (StreamType.SCREEN == streamType) { processor.doStopSharing(sid, uid); } - stopBroadcast(processor); + stopBroadcast(); return null; }, delayedExecutor(getFlowoutTimeout(), TimeUnit.SECONDS))); break; @@ -144,7 +144,6 @@ public class KStream extends AbstractStream { } }); outgoingMedia.addMediaFlowInStateChangeListener(evt -> log.warn("Media FlowIn :: {}", evt)); - processor.addStream(this); addListener(processor, sd.getSid(), sd.getUid(), sdpOffer); if (room.isRecording()) { startRecord(processor); @@ -282,8 +281,8 @@ public class KStream extends AbstractStream { } } - public void stopBroadcast(final StreamProcessor processor) { - room.onStopBroadcast(this, processor); + public void stopBroadcast() { + room.onStopBroadcast(this); } public void pauseSharing() { @@ -321,7 +320,7 @@ public class KStream extends AbstractStream { } releaseRecorder(); if (remove) { - processor.release(this); + processor.release(this, false); } } diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KTestStream.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KTestStream.java index 7da1a1e..0839693 100644 --- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KTestStream.java +++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KTestStream.java @@ -223,6 +223,6 @@ public class KTestStream extends AbstractStream { } releasePlayer(); releaseRecorder(); - processor.release(this); + processor.release(this, true); } } diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java index ade553e..e7d60405 100644 --- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java +++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java @@ -142,7 +142,7 @@ public class KurentoHandler { kuid = randomUUID().toString(); // will be changed to prevent double events client.destroy(); for (Entry<Long, KRoom> e : rooms.entrySet()) { - e.getValue().close(streamProcessor); + e.getValue().close(); } testProcessor.destroy(); streamProcessor.destroy(); @@ -238,7 +238,7 @@ public class KurentoHandler { pipe.addTag(t, TAG_KUID, kuid); pipe.addTag(t, TAG_ROOM, String.valueOf(roomId)); t.commit(); - room = new KRoom(r, pipe, chunkDao); + room = new KRoom(streamProcessor, r, pipe, chunkDao); rooms.put(roomId, room); } log.debug("Room {} found!", roomId); @@ -426,7 +426,7 @@ public class KurentoHandler { return; } else if (r != null) { rooms.remove(r.getRoomId()); - r.close(streamProcessor); + r.close(); } } log.warn("Invalid MediaPipeline {} detected, will be dropped, tags: {}", pipe.getId(), tags); diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java index 32cda74..caceb36 100644 --- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java +++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java @@ -65,6 +65,9 @@ import com.github.openjson.JSONObject; @Component public class StreamProcessor implements IStreamProcessor { private static final Logger log = LoggerFactory.getLogger(StreamProcessor.class); + /** + * Holds a reference to the current streams available on the server instance + */ private final Map<String, KStream> streamByUid = new ConcurrentHashMap<>(); @Autowired @@ -214,7 +217,7 @@ public class StreamProcessor implements IStreamProcessor { .forEach(lsd -> { KStream s = getByUid(lsd.getUid()); if (s != null) { - s.stopBroadcast(this); + s.stopBroadcast(); } c.removeStream(lsd.getUid()); closed.add(lsd.getUid()); @@ -284,7 +287,7 @@ public class StreamProcessor implements IStreamProcessor { KStream stream = streamByUid.get(sd.getUid()); if (stream != null) { KRoom room = kHandler.getRoom(c.getRoomId()); - room.onStopBroadcast(stream, this); + room.onStopBroadcast(stream); } }); } @@ -305,7 +308,7 @@ public class StreamProcessor implements IStreamProcessor { room.stopSharing(); if (Room.Type.INTERVIEW != room.getType() && room.isRecording()) { log.info("No more screen streams in the non-interview room, stopping recording"); - room.stopRecording(this, null); + room.stopRecording(null); } } } @@ -315,7 +318,7 @@ public class StreamProcessor implements IStreamProcessor { .collect(Collectors.toList()); if (streams.isEmpty()) { log.info("No more streams in the room, stopping recording"); - room.stopRecording(this, null); + room.stopRecording(null); } } } @@ -394,7 +397,7 @@ public class StreamProcessor implements IStreamProcessor { KStream sender = getByUid(uid); StreamDesc sd = doStopSharing(c.getSid(), uid); if (sender != null && sd != null) { - sender.stopBroadcast(this); + sender.stopBroadcast(); } else { log.warn("Could not stop broadcast - could be a KStream leak and lead to ghost KStream, client: {}, uid: {} ", c, uid); } @@ -447,14 +450,14 @@ public class StreamProcessor implements IStreamProcessor { if (!kHandler.isConnected() || !hasRightsToRecord(c)) { return; } - kHandler.getRoom(c.getRoomId()).startRecording(this, c); + kHandler.getRoom(c.getRoomId()).startRecording(c); } public void stopRecording(Client c) { if (!kHandler.isConnected() || !hasRightsToRecord(c)) { return; } - kHandler.getRoom(c.getRoomId()).stopRecording(this, c); + kHandler.getRoom(c.getRoomId()).stopRecording(c); // In case this user wasn't shareing his screen we also need to close that one c.getScreenStream().ifPresent(sd -> { @@ -494,8 +497,6 @@ public class StreamProcessor implements IStreamProcessor { } } if (c.getRoomId() != null) { - KRoom room = kHandler.getRoom(c.getRoomId()); - room.leave(this, c); checkStreams(c.getRoomId()); } } @@ -508,6 +509,12 @@ public class StreamProcessor implements IStreamProcessor { return streamByUid.values(); } + Collection<KStream> getByRoom(Long roomId) { + return streamByUid.values().stream() + .filter(stream -> stream.getRoom() != null && stream.getRoom().getRoomId().equals(roomId)) + .collect(Collectors.toList()); + } + Client getBySid(String sid) { return cm.getBySid(sid); } @@ -533,8 +540,11 @@ public class StreamProcessor implements IStreamProcessor { } @Override - public void release(AbstractStream stream) { + public void release(AbstractStream stream, boolean releaseStream) { final String uid = stream.getUid(); + if (releaseStream) { + stream.release(this); + } Client c = cm.getBySid(stream.getSid()); if (c != null) { StreamDesc sd = c.getStream(uid); diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/TestStreamProcessor.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/TestStreamProcessor.java index e3a171d..f3a3dad 100644 --- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/TestStreamProcessor.java +++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/TestStreamProcessor.java @@ -114,7 +114,7 @@ public class TestStreamProcessor implements IStreamProcessor { } @Override - public void release(AbstractStream stream) { + public void release(AbstractStream stream, boolean releaseStream) { streamByUid.remove(stream.getUid()); } diff --git a/openmeetings-web/src/main/java/org/apache/openmeetings/web/admin/connection/ConnectionsPanel.java b/openmeetings-web/src/main/java/org/apache/openmeetings/web/admin/connection/ConnectionsPanel.java index 84cc14c..270f809 100644 --- a/openmeetings-web/src/main/java/org/apache/openmeetings/web/admin/connection/ConnectionsPanel.java +++ b/openmeetings-web/src/main/java/org/apache/openmeetings/web/admin/connection/ConnectionsPanel.java @@ -90,14 +90,6 @@ public class ConnectionsPanel extends AdminBasePanel { l.addAll(streams); log.info("Retrieve all Streams, StreamProcessor has {} of streams", streams.size()); - List<KStreamDto> missing = kHandler.getRooms() - .stream() - .flatMap(room -> room.getParticipants().stream()) - .filter(stream -> !streamProcessor.hasStream(stream.getUid())) - .map(kStream -> new KStreamDto("KRoom", kStream)) - .collect(Collectors.toList()); - l.addAll(missing); - log.warn("Following streams were in KRoom but not in StreamProcessor: {}", missing); return l; }