This is an automated email from the ASF dual-hosted git repository. sebawagner pushed a commit to branch feature/OPENMEETINGS-2315-refactor-kstream-to-streamprocessor in repository https://gitbox.apache.org/repos/asf/openmeetings.git
commit a51b4fcd16fd7885d4e1754c7cee8af40e1a0c4a Author: Sebastian Wagner <sebawag...@apache.org> AuthorDate: Thu Apr 30 09:27:44 2020 +1200 OPENMEETINGS-2315 Only StreamProcessor to hold reference of KStream --- .../org/apache/openmeetings/core/remote/KRoom.java | 75 +++++++++------------- .../apache/openmeetings/core/remote/KStream.java | 2 +- .../openmeetings/core/remote/KurentoHandler.java | 6 +- .../openmeetings/core/remote/StreamProcessor.java | 22 +++++-- .../core/remote/TestStreamProcessor.java | 1 + .../web/admin/connection/ConnectionsPanel.java | 8 --- 6 files changed, 52 insertions(+), 62 deletions(-) diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java index 47abfdd..fc414dc 100644 --- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java +++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java @@ -27,9 +27,7 @@ import static org.apache.openmeetings.core.remote.KurentoHandler.newKurentoMsg; import java.util.Collection; import java.util.Date; -import java.util.Map; import java.util.Optional; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.openmeetings.core.util.WebSocketHelper; @@ -53,10 +51,16 @@ import org.slf4j.LoggerFactory; import com.github.openjson.JSONObject; +/** + * Bean object dynamically created representing a conference room on the MediaServer + * + */ public class KRoom { private static final Logger log = LoggerFactory.getLogger(KRoom.class); - - private final Map<String, KStream> streams = new ConcurrentHashMap<>(); + /** + * Not injected by annotation but by constructor. + */ + private final StreamProcessor streamProcessor; private final MediaPipeline pipeline; private final Long roomId; private final Room.Type type; @@ -67,7 +71,8 @@ public class KRoom { private JSONObject recordingUser = new JSONObject(); private JSONObject sharingUser = new JSONObject(); - public KRoom(Room r, MediaPipeline pipeline, RecordingChunkDao chunkDao) { + public KRoom(StreamProcessor streamProcessor, Room r, MediaPipeline pipeline, RecordingChunkDao chunkDao) { + this.streamProcessor = streamProcessor; this.roomId = r.getId(); this.type = r.getType(); this.pipeline = pipeline; @@ -98,16 +103,16 @@ public class KRoom { public KStream join(final StreamDesc sd) { log.info("ROOM {}: join client {}, stream: {}", roomId, sd.getClient(), sd.getUid()); final KStream stream = new KStream(sd, this); - streams.put(stream.getUid(), stream); + streamProcessor.addStream(stream); return stream; } public Collection<KStream> getParticipants() { - return streams.values(); + return streamProcessor.getStreamsByRoom(this.getRoomId()); } public void onStopBroadcast(KStream stream, final StreamProcessor processor) { - streams.remove(stream.getUid()); + streamProcessor.release(stream); stream.release(processor); WebSocketHelper.sendAll(newKurentoMsg() .put("id", "broadcastStopped") @@ -118,21 +123,6 @@ public class KRoom { //FIXME TODO permission can be removed, some listener might be required } - public void leave(final StreamProcessor processor, final Client c) { - for (Map.Entry<String, KStream> e : streams.entrySet()) { - e.getValue().remove(c); - } - for (StreamDesc sd : c.getStreams()) { - if (StreamType.SCREEN == sd.getType()) { - - } - KStream stream = streams.remove(sd.getUid()); - if (stream != null) { - stream.release(processor); - } - } - } - public boolean isRecording() { return recordingStarted.get(); } @@ -141,7 +131,7 @@ public class KRoom { return new JSONObject(recordingUser.toString()); } - public void startRecording(StreamProcessor processor, Client c) { + public void startRecording(Client c) { if (recordingStarted.compareAndSet(false, true)) { log.debug("##REC:: recording in room {} is starting ::", roomId); Room r = c.getRoom(); @@ -173,16 +163,16 @@ public class KRoom { Optional<StreamDesc> osd = c.getScreenStream(); if (osd.isPresent()) { osd.get().addActivity(Activity.RECORD); - processor.getClientManager().update(c); + streamProcessor.getClientManager().update(c); rec.setWidth(osd.get().getWidth()); rec.setHeight(osd.get().getHeight()); } - rec = processor.getRecordingDao().update(rec); + rec = streamProcessor.getRecordingDao().update(rec); // Receive recordingId recordingId = rec.getId(); - for (final KStream stream : streams.values()) { - stream.startRecord(processor); - } + streamProcessor.getStreamsByRoom(this.getRoomId()).forEach( + stream -> stream.startRecord(streamProcessor) + ); // Send notification to all users that the recording has been started WebSocketHelper.sendRoom(new RoomMessage(roomId, u, RoomMessage.Type.RECORDING_TOGGLED)); @@ -190,19 +180,19 @@ public class KRoom { } } - public void stopRecording(final StreamProcessor processor, Client c) { + public void stopRecording(Client c) { if (recordingStarted.compareAndSet(true, false)) { log.debug("##REC:: recording in room {} is stopping {} ::", roomId, recordingId); - for (final KStream stream : streams.values()) { - stream.stopRecord(); - } - Recording rec = processor.getRecordingDao().get(recordingId); + streamProcessor.getStreamsByRoom(this.getRoomId()).forEach( + stream -> stream.stopRecord() + ); + Recording rec = streamProcessor.getRecordingDao().get(recordingId); rec.setRecordEnd(new Date()); - rec = processor.getRecordingDao().update(rec); + rec = streamProcessor.getRecordingDao().update(rec); recordingUser = new JSONObject(); recordingId = null; - processor.startConvertion(rec); + streamProcessor.startConvertion(rec); User u; if (c == null) { u = new User(); @@ -211,8 +201,8 @@ public class KRoom { Optional<StreamDesc> osd = c.getScreenStream(); if (osd.isPresent()) { osd.get().removeActivity(Activity.RECORD); - processor.getClientManager().update(c); - processor.getHandler().sendShareUpdated(osd.get()); + streamProcessor.getClientManager().update(c); + streamProcessor.getHandler().sendShareUpdated(osd.get()); } } // Send notification to all users that the recording has been started @@ -270,11 +260,10 @@ public class KRoom { } } - public void close(final StreamProcessor processor) { - for (final KStream stream : streams.values()) { - stream.release(processor); - } - streams.clear(); + public void close() { + streamProcessor.getStreamsByRoom(this.getRoomId()).forEach( + stream -> stream.release(streamProcessor) + ); pipeline.release(new Continuation<Void>() { @Override public void onSuccess(Void result) throws Exception { diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java index 397d25f..e7cb42b 100644 --- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java +++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java @@ -144,7 +144,7 @@ public class KStream extends AbstractStream { } }); outgoingMedia.addMediaFlowInStateChangeListener(evt -> log.warn("Media FlowIn :: {}", evt)); - processor.addStream(this); + // processor.addStream(this); addListener(processor, sd.getSid(), sd.getUid(), sdpOffer); if (room.isRecording()) { startRecord(processor); diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java index ade553e..e7d60405 100644 --- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java +++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java @@ -142,7 +142,7 @@ public class KurentoHandler { kuid = randomUUID().toString(); // will be changed to prevent double events client.destroy(); for (Entry<Long, KRoom> e : rooms.entrySet()) { - e.getValue().close(streamProcessor); + e.getValue().close(); } testProcessor.destroy(); streamProcessor.destroy(); @@ -238,7 +238,7 @@ public class KurentoHandler { pipe.addTag(t, TAG_KUID, kuid); pipe.addTag(t, TAG_ROOM, String.valueOf(roomId)); t.commit(); - room = new KRoom(r, pipe, chunkDao); + room = new KRoom(streamProcessor, r, pipe, chunkDao); rooms.put(roomId, room); } log.debug("Room {} found!", roomId); @@ -426,7 +426,7 @@ public class KurentoHandler { return; } else if (r != null) { rooms.remove(r.getRoomId()); - r.close(streamProcessor); + r.close(); } } log.warn("Invalid MediaPipeline {} detected, will be dropped, tags: {}", pipe.getId(), tags); diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java index 32cda74..779928a 100644 --- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java +++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java @@ -25,6 +25,7 @@ import static org.apache.openmeetings.core.remote.KurentoHandler.activityAllowed import static org.apache.openmeetings.core.remote.KurentoHandler.newKurentoMsg; import static org.apache.openmeetings.core.remote.KurentoHandler.sendError; +import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; import java.util.List; @@ -65,6 +66,9 @@ import com.github.openjson.JSONObject; @Component public class StreamProcessor implements IStreamProcessor { private static final Logger log = LoggerFactory.getLogger(StreamProcessor.class); + /** + * Holds a reference to the current streams available on the server instance + */ private final Map<String, KStream> streamByUid = new ConcurrentHashMap<>(); @Autowired @@ -305,7 +309,7 @@ public class StreamProcessor implements IStreamProcessor { room.stopSharing(); if (Room.Type.INTERVIEW != room.getType() && room.isRecording()) { log.info("No more screen streams in the non-interview room, stopping recording"); - room.stopRecording(this, null); + room.stopRecording(null); } } } @@ -315,7 +319,7 @@ public class StreamProcessor implements IStreamProcessor { .collect(Collectors.toList()); if (streams.isEmpty()) { log.info("No more streams in the room, stopping recording"); - room.stopRecording(this, null); + room.stopRecording(null); } } } @@ -447,14 +451,14 @@ public class StreamProcessor implements IStreamProcessor { if (!kHandler.isConnected() || !hasRightsToRecord(c)) { return; } - kHandler.getRoom(c.getRoomId()).startRecording(this, c); + kHandler.getRoom(c.getRoomId()).startRecording(c); } public void stopRecording(Client c) { if (!kHandler.isConnected() || !hasRightsToRecord(c)) { return; } - kHandler.getRoom(c.getRoomId()).stopRecording(this, c); + kHandler.getRoom(c.getRoomId()).stopRecording(c); // In case this user wasn't shareing his screen we also need to close that one c.getScreenStream().ifPresent(sd -> { @@ -494,13 +498,11 @@ public class StreamProcessor implements IStreamProcessor { } } if (c.getRoomId() != null) { - KRoom room = kHandler.getRoom(c.getRoomId()); - room.leave(this, c); checkStreams(c.getRoomId()); } } - void addStream(KStream stream) { + public void addStream(KStream stream) { streamByUid.put(stream.getUid(), stream); } @@ -508,6 +510,12 @@ public class StreamProcessor implements IStreamProcessor { return streamByUid.values(); } + public Collection<KStream> getStreamsByRoom(Long roomId) { + return streamByUid.values().stream() + .filter(stream -> stream.getRoom() != null && stream.getRoom().getRoomId().equals(roomId)) + .collect(Collectors.toCollection(ArrayList::new)); + } + Client getBySid(String sid) { return cm.getBySid(sid); } diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/TestStreamProcessor.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/TestStreamProcessor.java index e3a171d..764e458 100644 --- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/TestStreamProcessor.java +++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/TestStreamProcessor.java @@ -126,4 +126,5 @@ public class TestStreamProcessor implements IStreamProcessor { } streamByUid.clear(); } + } diff --git a/openmeetings-web/src/main/java/org/apache/openmeetings/web/admin/connection/ConnectionsPanel.java b/openmeetings-web/src/main/java/org/apache/openmeetings/web/admin/connection/ConnectionsPanel.java index 84cc14c..270f809 100644 --- a/openmeetings-web/src/main/java/org/apache/openmeetings/web/admin/connection/ConnectionsPanel.java +++ b/openmeetings-web/src/main/java/org/apache/openmeetings/web/admin/connection/ConnectionsPanel.java @@ -90,14 +90,6 @@ public class ConnectionsPanel extends AdminBasePanel { l.addAll(streams); log.info("Retrieve all Streams, StreamProcessor has {} of streams", streams.size()); - List<KStreamDto> missing = kHandler.getRooms() - .stream() - .flatMap(room -> room.getParticipants().stream()) - .filter(stream -> !streamProcessor.hasStream(stream.getUid())) - .map(kStream -> new KStreamDto("KRoom", kStream)) - .collect(Collectors.toList()); - l.addAll(missing); - log.warn("Following streams were in KRoom but not in StreamProcessor: {}", missing); return l; }