This is an automated email from the ASF dual-hosted git repository. sebawagner pushed a commit to branch feature/OPENMEETINGS-2585-IKurentoHandler-class in repository https://gitbox.apache.org/repos/asf/openmeetings.git
commit 9305adc7a8c10923a00d162d958136dd5242a983 Author: Sebastian Wagner <seba.wag...@gmail.com> AuthorDate: Thu Mar 4 20:17:25 2021 +1300 OPENMEETINGS-2585 Add IKurentoHandler class and IStreamHandler methods. --- .../openmeetings/core/remote/IKurentoHandler.java | 82 ++++++++++++ .../openmeetings/core/remote/IStreamProcessor.java | 60 +++++++++ .../org/apache/openmeetings/core/remote/KRoom.java | 50 +++---- .../apache/openmeetings/core/remote/KStream.java | 18 +-- .../openmeetings/core/remote/KTestStream.java | 12 +- .../openmeetings/core/remote/KurentoHandler.java | 55 +++++--- .../openmeetings/core/remote/StreamProcessor.java | 67 ++++++---- .../core/remote/StreamProcessorActions.java | 9 +- .../core/remote/TestStreamProcessor.java | 147 ++++++++++++++++++++- 9 files changed, 409 insertions(+), 91 deletions(-) diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/IKurentoHandler.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/IKurentoHandler.java new file mode 100644 index 0000000..8adac94 --- /dev/null +++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/IKurentoHandler.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License") + you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.openmeetings.core.remote; + +import java.util.Collection; +import java.util.Map; + +import org.apache.openmeetings.core.sip.SipManager; +import org.apache.openmeetings.db.dao.record.RecordingChunkDao; +import org.apache.openmeetings.db.entity.basic.Client; +import org.apache.openmeetings.db.entity.basic.IWsClient; +import org.apache.openmeetings.db.entity.basic.Client.StreamDesc; +import org.apache.openmeetings.db.entity.room.Room; +import org.kurento.client.Continuation; +import org.kurento.client.MediaPipeline; + +import com.github.openjson.JSONArray; +import com.github.openjson.JSONObject; + +public interface IKurentoHandler { + + void init(); + + boolean isConnected(); + + void sendShareUpdated(StreamDesc sd); + + void destroy(); + + void onMessage(IWsClient inClient, JSONObject msg); + + JSONObject getRecordingUser(Long roomId); + + void leaveRoom(Client c); + + void sendClient(String sid, JSONObject msg); + + void sendError(IWsClient c, String msg); + + void remove(IWsClient c); + + MediaPipeline createPipiline(Map<String, String> tags, Continuation<Void> continuation); + + KRoom getRoom(Long roomId); + + Collection<KRoom> getRooms(); + + void updateSipCount(Room r, long count); + + JSONObject newKurentoMsg(); + + JSONArray getTurnServers(Client c); + + JSONArray getTurnServers(Client c, final boolean test); + + TestStreamProcessor getTestProcessor(); + + IStreamProcessor getStreamProcessor(); + + SipManager getSipManager(); + + RecordingChunkDao getChunkDao(); + + int getFlowoutTimeout(); + +} diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/IStreamProcessor.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/IStreamProcessor.java index f142285..a658243 100644 --- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/IStreamProcessor.java +++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/IStreamProcessor.java @@ -18,7 +18,67 @@ */ package org.apache.openmeetings.core.remote; +import java.util.Collection; +import java.util.stream.Stream; + +import org.apache.openmeetings.db.entity.basic.Client; +import org.apache.openmeetings.db.entity.basic.Client.Activity; +import org.apache.openmeetings.db.entity.basic.Client.StreamDesc; +import org.apache.openmeetings.db.entity.record.Recording; + +import com.github.openjson.JSONObject; + public interface IStreamProcessor { + + void toggleActivity(Client c, Activity a); + + void rightsUpdated(Client c); + + // Sharing + boolean hasRightsToShare(Client c); + + Stream<KStream> getByRoom(Long roomId); + + KStream getByUid(String uid); + + void onMessage(Client c, final String cmdId, JSONObject msg); + + void startBroadcast(KStream stream, StreamDesc sd, String sdpOffer, Runnable then); + + boolean startConvertion(Recording rec); + + void pauseSharing(Client c, String uid); + + StreamDesc doStopSharing(String sid, String uid); + + Client getBySid(String sid); + + boolean screenShareAllowed(Client c); + + boolean isSharing(Long roomId); + + void addStream(KStream stream); + + boolean hasRightsToRecord(Client c); + + boolean recordingAllowed(Client c); + + void startRecording(Client c); + + void stopRecording(Client c); + + boolean isRecording(Long roomId); + + Collection<KStream> getStreams(); + + boolean hasStream(String uid); + + void remove(Client c); + void release(AbstractStream stream, boolean releaseStream); + void destroy(); + + JSONObject newStoppedMsg(StreamDesc sd); + } diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java index 3de644f..9b65c73 100644 --- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java +++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java @@ -20,7 +20,6 @@ package org.apache.openmeetings.core.remote; import static java.util.UUID.randomUUID; import static org.apache.openmeetings.core.remote.KurentoHandler.PARAM_ICE; -import static org.apache.openmeetings.core.remote.KurentoHandler.newKurentoMsg; import static org.apache.openmeetings.db.util.ApplicationHelper.ensureApplication; import java.util.Date; @@ -31,6 +30,7 @@ import org.apache.commons.lang3.time.FastDateFormat; import org.apache.openmeetings.IApplication; import org.apache.openmeetings.core.util.WebSocketHelper; import org.apache.openmeetings.db.dao.record.RecordingChunkDao; +import org.apache.openmeetings.db.dao.record.RecordingDao; import org.apache.openmeetings.db.entity.basic.Client; import org.apache.openmeetings.db.entity.basic.Client.Activity; import org.apache.openmeetings.db.entity.basic.Client.StreamDesc; @@ -55,8 +55,11 @@ import com.github.openjson.JSONObject; public class KRoom { private static final Logger log = LoggerFactory.getLogger(KRoom.class); - private final StreamProcessor processor; + private final IKurentoHandler kHandler; + private final IStreamProcessor processor; + private final IClientManager cm; private final RecordingChunkDao chunkDao; + private final RecordingDao recDao; private final Room room; private final AtomicBoolean recordingStarted = new AtomicBoolean(false); private final AtomicBoolean sharingStarted = new AtomicBoolean(false); @@ -65,9 +68,12 @@ public class KRoom { private JSONObject recordingUser = new JSONObject(); private JSONObject sharingUser = new JSONObject(); - public KRoom(KurentoHandler handler, Room r) { + public KRoom(IKurentoHandler handler, Room r, IClientManager cm, RecordingDao recDao) { + this.kHandler = handler; + this.cm = cm; this.processor = handler.getStreamProcessor(); this.chunkDao = handler.getChunkDao(); + this.recDao = recDao; this.room = r; log.info("ROOM {} has been created", room.getId()); } @@ -84,16 +90,16 @@ public class KRoom { return chunkDao; } - public KStream join(final StreamDesc sd, KurentoHandler kHandler) { + public KStream join(final StreamDesc sd, IKurentoHandler kHandler) { log.info("ROOM {}: join client {}, stream: {}", room.getId(), sd.getClient(), sd.getUid()); - final KStream stream = new KStream(sd, this, kHandler); + final KStream stream = new KStream(sd, this, kHandler, this.processor); processor.addStream(stream); return stream; } public void onStopBroadcast(KStream stream) { processor.release(stream, true); - WebSocketHelper.sendAll(newKurentoMsg() + WebSocketHelper.sendAll(kHandler.newKurentoMsg() .put("id", "broadcastStopped") .put("uid", stream.getUid()) .toString() @@ -144,11 +150,11 @@ public class KRoom { Optional<StreamDesc> osd = c.getScreenStream(); if (osd.isPresent()) { osd.get().addActivity(Activity.RECORD); - processor.getClientManager().update(c); + cm.update(c); rec.setWidth(osd.get().getWidth()); rec.setHeight(osd.get().getHeight()); } - rec = processor.getRecordingDao().update(rec); + rec = recDao.update(rec); // Receive recordingId recordingId = rec.getId(); processor.getByRoom(room.getId()).forEach(KStream::startRecord); @@ -163,9 +169,9 @@ public class KRoom { if (recordingStarted.compareAndSet(true, false)) { log.debug("##REC:: recording in room {} is stopping {} ::", room.getId(), recordingId); processor.getByRoom(room.getId()).forEach(KStream::stopRecord); - Recording rec = processor.getRecordingDao().get(recordingId); + Recording rec = recDao.get(recordingId); rec.setRecordEnd(new Date()); - rec = processor.getRecordingDao().update(rec); + rec = recDao.update(rec); recordingUser = new JSONObject(); recordingId = null; @@ -178,8 +184,8 @@ public class KRoom { Optional<StreamDesc> osd = c.getScreenStream(); if (osd.isPresent()) { osd.get().removeActivity(Activity.RECORD); - processor.getClientManager().update(c); - processor.getHandler().sendShareUpdated(osd.get()); + cm.update(c); + kHandler.sendShareUpdated(osd.get()); } } // Send notification to all users that the recording has been started @@ -203,29 +209,28 @@ public class KRoom { return new JSONObject(sharingUser.toString()); } - public void startSharing(StreamProcessor processor, IClientManager cm, Client c, Optional<StreamDesc> osd, JSONObject msg, Activity a) { + public void startSharing(IClientManager cm, Client c, Optional<StreamDesc> osd, JSONObject msg, Activity a) { StreamDesc sd; - KurentoHandler h = processor.getHandler(); if (sharingStarted.compareAndSet(false, true)) { sharingUser.put("sid", c.getSid()); sd = c.addStream(StreamType.SCREEN, a); cm.update(c); log.debug("Stream.UID {}: sharing has been started, activity: {}", sd.getUid(), a); - h.sendClient(sd.getSid(), newKurentoMsg() + kHandler.sendClient(sd.getSid(), kHandler.newKurentoMsg() .put("id", "broadcast") .put("stream", sd.toJson() .put("shareType", msg.getString("shareType")) .put("fps", msg.getString("fps"))) - .put(PARAM_ICE, h.getTurnServers(c))); + .put(PARAM_ICE, kHandler.getTurnServers(c))); } else if (osd.isPresent() && !osd.get().hasActivity(a)) { sd = osd.get(); sd.addActivity(a); cm.update(c); - h.sendShareUpdated(sd); + kHandler.sendShareUpdated(sd); WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), c, RoomMessage.Type.RIGHT_UPDATED, c.getUid())); - WebSocketHelper.sendRoomOthers(room.getId(), c.getUid(), newKurentoMsg() + WebSocketHelper.sendRoomOthers(room.getId(), c.getUid(), kHandler.newKurentoMsg() .put("id", "newStream") - .put(PARAM_ICE, processor.getHandler().getTurnServers(c)) + .put(PARAM_ICE, kHandler.getTurnServers(c)) .put("stream", sd.toJson())); } } @@ -245,17 +250,16 @@ public class KRoom { if (count != sipCount) { processor.getByRoom(room.getId()).forEach(stream -> stream.addSipProcessor(count)); if (sipCount == 0) { - processor.getClientManager() - .streamByRoom(room.getId()) + cm.streamByRoom(room.getId()) .filter(Client::isSip) .findAny() .ifPresent(c -> { StreamDesc sd = c.addStream(StreamType.WEBCAM, Activity.AUDIO); sd.setWidth(120).setHeight(90); c.restoreActivities(sd); - KStream stream = join(sd, processor.getHandler()); + KStream stream = join(sd, kHandler); stream.startBroadcast(sd, "", () -> {}); - processor.getClientManager().update(c); + cm.update(c); }); } sipCount = count; diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java index aaef0ca..c1ccec3 100644 --- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java +++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java @@ -27,8 +27,6 @@ import static org.apache.openmeetings.core.remote.KurentoHandler.PARAM_CANDIDATE import static org.apache.openmeetings.core.remote.KurentoHandler.PARAM_ICE; import static org.apache.openmeetings.core.remote.KurentoHandler.TAG_ROOM; import static org.apache.openmeetings.core.remote.KurentoHandler.TAG_STREAM_UID; -import static org.apache.openmeetings.core.remote.KurentoHandler.getFlowoutTimeout; -import static org.apache.openmeetings.core.remote.KurentoHandler.newKurentoMsg; import static org.apache.openmeetings.util.OmFileHelper.getRecUri; import static org.apache.openmeetings.util.OmFileHelper.getRecordingChunk; @@ -75,7 +73,8 @@ import com.github.openjson.JSONObject; public class KStream extends AbstractStream implements ISipCallbacks { private static final Logger log = LoggerFactory.getLogger(KStream.class); - private final KurentoHandler kHandler; + private final IKurentoHandler kHandler; + private final IStreamProcessor processor; private final KRoom kRoom; private final Date connectedSince; private final StreamType streamType; @@ -96,9 +95,10 @@ public class KStream extends AbstractStream implements ISipCallbacks { private boolean hasScreen; private boolean sipClient; - public KStream(final StreamDesc sd, KRoom kRoom, KurentoHandler kHandler) { + public KStream(final StreamDesc sd, KRoom kRoom, IKurentoHandler kHandler, IStreamProcessor processor) { super(sd.getSid(), sd.getUid()); this.kRoom = kRoom; + this.processor = processor; streamType = sd.getType(); this.connectedSince = new Date(); this.kHandler = kHandler; @@ -189,11 +189,11 @@ public class KStream extends AbstractStream implements ISipCallbacks { flowoutFuture = Optional.of(new CompletableFuture<>().completeAsync(() -> { log.warn("KStream will be dropped {}, sid {}, uid {}", sd, sid, uid); if (StreamType.SCREEN == streamType) { - kHandler.getStreamProcessor().doStopSharing(sid, uid); + processor.doStopSharing(sid, uid); } stopBroadcast(); return null; - }, delayedExecutor(getFlowoutTimeout(), TimeUnit.SECONDS))); + }, delayedExecutor(kHandler.getFlowoutTimeout(), TimeUnit.SECONDS))); } else { dropFlowoutFuture(); } @@ -213,7 +213,7 @@ public class KStream extends AbstractStream implements ISipCallbacks { Client c = sd.getClient(); WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), c, RoomMessage.Type.RIGHT_UPDATED, c.getUid())); if (hasAudio || hasVideo || hasScreen) { - WebSocketHelper.sendRoomOthers(getRoomId(), c.getUid(), newKurentoMsg() + WebSocketHelper.sendRoomOthers(getRoomId(), c.getUid(), kHandler.newKurentoMsg() .put("id", "newStream") .put(PARAM_ICE, kHandler.getTurnServers(c)) .put("stream", sd.toJson())); @@ -252,7 +252,7 @@ public class KStream extends AbstractStream implements ISipCallbacks { ((WebRtcEndpoint)endpoint).gatherCandidates(); // this one might throw Exception } log.trace("USER {}: SdpAnswer is {}", this.uid, sdpAnswer); - kHandler.sendClient(sid, newKurentoMsg() + kHandler.sendClient(sid, kHandler.newKurentoMsg() .put("id", "videoResponse") .put("uid", this.uid) .put("sdpAnswer", sdpAnswer)); @@ -311,7 +311,7 @@ public class KStream extends AbstractStream implements ISipCallbacks { reApplyIceCandiates(endpoint, recv); endpoint.addIceCandidateFoundListener(evt -> kHandler.sendClient(sid - , newKurentoMsg() + , kHandler.newKurentoMsg() .put("id", "iceCandidate") .put("uid", KStream.this.uid) .put(PARAM_CANDIDATE, convert(JsonUtils.toJsonObject(evt.getCandidate())))) diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KTestStream.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KTestStream.java index 741c1f6..49d411a 100644 --- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KTestStream.java +++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KTestStream.java @@ -23,8 +23,6 @@ import static org.apache.openmeetings.core.remote.KurentoHandler.MODE_TEST; import static org.apache.openmeetings.core.remote.KurentoHandler.PARAM_CANDIDATE; import static org.apache.openmeetings.core.remote.KurentoHandler.TAG_MODE; import static org.apache.openmeetings.core.remote.KurentoHandler.TAG_ROOM; -import static org.apache.openmeetings.core.remote.KurentoHandler.sendError; -import static org.apache.openmeetings.core.remote.TestStreamProcessor.newTestKurentoMsg; import static org.apache.openmeetings.util.OmFileHelper.EXTENSION_WEBM; import static org.apache.openmeetings.util.OmFileHelper.TEST_SETUP_PREFIX; import static org.apache.openmeetings.util.OmFileHelper.getStreamsDir; @@ -55,7 +53,7 @@ import com.github.openjson.JSONObject; public class KTestStream extends AbstractStream { private static final Logger log = LoggerFactory.getLogger(KTestStream.class); private static final Map<String, String> TAGS = Map.of(TAG_MODE, MODE_TEST, TAG_ROOM, MODE_TEST); - private final KurentoHandler kHandler; + private final IKurentoHandler kHandler; private MediaPipeline pipeline; private WebRtcEndpoint webRtcEndpoint; private PlayerEndpoint player; @@ -65,12 +63,16 @@ public class KTestStream extends AbstractStream { private ScheduledFuture<?> recHandle; private int recTime; - public KTestStream(IWsClient c, JSONObject msg, KurentoHandler kHandler) { + public KTestStream(IWsClient c, JSONObject msg, IKurentoHandler kHandler) { super(null, c.getUid()); this.kHandler = kHandler; createPipeline(() -> startTestRecording(c, msg)); } + JSONObject newTestKurentoMsg() { + return kHandler.newKurentoMsg().put(TAG_MODE, MODE_TEST); + } + private void startTestRecording(IWsClient c, JSONObject msg) { webRtcEndpoint = createWebRtcEndpoint(pipeline, null); webRtcEndpoint.connect(webRtcEndpoint); @@ -126,7 +128,7 @@ public class KTestStream extends AbstractStream { @Override public void onError(Throwable cause) throws Exception { - sendError(c, "Failed to start recording"); + kHandler.sendError(c, "Failed to start recording"); log.error("Failed to start recording", cause); } }); diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java index 1e8ebb4..597f831 100644 --- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java +++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java @@ -44,6 +44,7 @@ import javax.crypto.spec.SecretKeySpec; import org.apache.openmeetings.core.sip.SipManager; import org.apache.openmeetings.core.util.WebSocketHelper; import org.apache.openmeetings.db.dao.record.RecordingChunkDao; +import org.apache.openmeetings.db.dao.record.RecordingDao; import org.apache.openmeetings.db.dao.room.RoomDao; import org.apache.openmeetings.db.entity.basic.Client; import org.apache.openmeetings.db.entity.basic.Client.Activity; @@ -74,13 +75,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; -import org.springframework.stereotype.Component; import com.github.openjson.JSONArray; import com.github.openjson.JSONObject; -@Component -public class KurentoHandler { +public class KurentoHandler implements IKurentoHandler { private static final Logger log = LoggerFactory.getLogger(KurentoHandler.class); public static final String PARAM_ICE = "iceServers"; public static final String PARAM_CANDIDATE = "candidate"; @@ -123,17 +122,20 @@ public class KurentoHandler { @Autowired private IClientManager cm; @Autowired + private RecordingDao recDao; + @Autowired private RoomDao roomDao; @Autowired private RecordingChunkDao chunkDao; @Autowired private TestStreamProcessor testProcessor; @Autowired - private StreamProcessor streamProcessor; + private IStreamProcessor streamProcessor; @Autowired private SipManager sipManager; - boolean isConnected() { + @Override + public boolean isConnected() { boolean connctd = connected.get() && client != null && !client.isClosed(); if (!connctd) { log.warn(WARN_NO_KURENTO); @@ -141,6 +143,7 @@ public class KurentoHandler { return connctd; } + @Override @PostConstruct public void init() { check = () -> { @@ -196,6 +199,7 @@ public class KurentoHandler { kmsRecheckScheduler.scheduleAtFixedRate(check, 0L, checkTimeout, MILLISECONDS); } + @Override @PreDestroy public void destroy() { clean(); @@ -233,6 +237,7 @@ public class KurentoHandler { return client.beginTransaction(); } + @Override public void onMessage(IWsClient inClient, JSONObject msg) { if (!isConnected()) { sendError(inClient, "Multimedia server is inaccessible"); @@ -252,6 +257,7 @@ public class KurentoHandler { } } + @Override public JSONObject getRecordingUser(Long roomId) { if (!isConnected()) { return new JSONObject(); @@ -259,6 +265,7 @@ public class KurentoHandler { return getRoom(roomId).getRecordingUser(); } + @Override public void leaveRoom(Client c) { remove(c); WebSocketHelper.sendAll(newKurentoMsg() @@ -268,23 +275,27 @@ public class KurentoHandler { ); } - void sendShareUpdated(StreamDesc sd) { + @Override + public void sendShareUpdated(StreamDesc sd) { sendClient(sd.getSid(), newKurentoMsg() .put("id", "shareUpdated") .put("stream", sd.toJson()) ); } + @Override public void sendClient(String sid, JSONObject msg) { WebSocketHelper.sendClient(cm.getBySid(sid), msg); } - public static void sendError(IWsClient c, String msg) { + @Override + public void sendError(IWsClient c, String msg) { WebSocketHelper.sendClient(c, newKurentoMsg() .put("id", "error") .put("message", msg)); } + @Override public void remove(IWsClient c) { if (!isConnected() || c == null) { return; @@ -296,7 +307,8 @@ public class KurentoHandler { streamProcessor.remove((Client)c); } - MediaPipeline createPipiline(Map<String, String> tags, Continuation<Void> continuation) { + @Override + public MediaPipeline createPipiline(Map<String, String> tags, Continuation<Void> continuation) { Transaction t = beginTransaction(); MediaPipeline pipe = client.createMediaPipeline(t); pipe.addTag(t, TAG_KUID, kuid); @@ -305,23 +317,27 @@ public class KurentoHandler { return pipe; } - KRoom getRoom(Long roomId) { + @Override + public KRoom getRoom(Long roomId) { return rooms.computeIfAbsent(roomId, k -> { log.debug("Room {} does not exist. Will create now!", roomId); Room r = roomDao.get(roomId); - return new KRoom(this, r); + return new KRoom(this, r, cm, recDao); }); } + @Override public Collection<KRoom> getRooms() { return rooms.values(); } + @Override public void updateSipCount(Room r, long count) { getRoom(r.getId()).updateSipCount(count); } - static JSONObject newKurentoMsg() { + @Override + public JSONObject newKurentoMsg() { return new JSONObject().put("type", KURENTO_TYPE); } @@ -343,11 +359,13 @@ public class KurentoHandler { return r; } + @Override public JSONArray getTurnServers(Client c) { return getTurnServers(c, false); } - JSONArray getTurnServers(Client c, final boolean test) { + @Override + public JSONArray getTurnServers(Client c, final boolean test) { JSONArray arr = new JSONArray(); if (!Strings.isEmpty(turnUrl)) { try { @@ -397,23 +415,28 @@ public class KurentoHandler { return kuid; } + @Override public TestStreamProcessor getTestProcessor() { return testProcessor; } - StreamProcessor getStreamProcessor() { + @Override + public IStreamProcessor getStreamProcessor() { return streamProcessor; } - SipManager getSipManager() { + @Override + public SipManager getSipManager() { return sipManager; } - RecordingChunkDao getChunkDao() { + @Override + public RecordingChunkDao getChunkDao() { return chunkDao; } - static int getFlowoutTimeout() { + @Override + public int getFlowoutTimeout() { return flowoutTimeout; } diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java index 3771c0c..368758d 100644 --- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java +++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java @@ -21,7 +21,6 @@ package org.apache.openmeetings.core.remote; import static org.apache.openmeetings.core.remote.KurentoHandler.PARAM_ICE; import static org.apache.openmeetings.core.remote.KurentoHandler.activityAllowed; -import static org.apache.openmeetings.core.remote.KurentoHandler.newKurentoMsg; import static org.apache.openmeetings.util.OpenmeetingsVariables.isRecordingsEnabled; import java.util.Collection; @@ -37,7 +36,6 @@ import org.apache.openmeetings.core.converter.IRecordingConverter; import org.apache.openmeetings.core.converter.InterviewConverter; import org.apache.openmeetings.core.converter.RecordingConverter; import org.apache.openmeetings.core.util.WebSocketHelper; -import org.apache.openmeetings.db.dao.record.RecordingDao; import org.apache.openmeetings.db.entity.basic.Client; import org.apache.openmeetings.db.entity.basic.Client.Activity; import org.apache.openmeetings.db.entity.basic.Client.StreamDesc; @@ -54,12 +52,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.task.TaskExecutor; -import org.springframework.stereotype.Component; import com.github.openjson.JSONArray; import com.github.openjson.JSONObject; -@Component public class StreamProcessor implements IStreamProcessor { private static final Logger log = LoggerFactory.getLogger(StreamProcessor.class); /** @@ -70,9 +66,7 @@ public class StreamProcessor implements IStreamProcessor { @Autowired private IClientManager cm; @Autowired - private RecordingDao recDao; - @Autowired - private KurentoHandler kHandler; + private IKurentoHandler kHandler; @Autowired private TaskExecutor taskExecutor; @Autowired @@ -83,7 +77,8 @@ public class StreamProcessor implements IStreamProcessor { private StreamProcessorActions streamProcessorActions; @TimedApplication - void onMessage(Client c, final String cmdId, JSONObject msg) { + @Override + public void onMessage(Client c, final String cmdId, JSONObject msg) { final String uid = msg.optString("uid"); StreamDesc sd; Optional<StreamDesc> osd; @@ -163,7 +158,8 @@ public class StreamProcessor implements IStreamProcessor { * @param then steps need to be done after broadcast is started * @return the current KStream */ - void startBroadcast(KStream stream, StreamDesc sd, String sdpOffer, Runnable then) { + @Override + public void startBroadcast(KStream stream, StreamDesc sd, String sdpOffer, Runnable then) { stream.startBroadcast(sd, sdpOffer, then); } @@ -186,6 +182,7 @@ public class StreamProcessor implements IStreamProcessor { return closed; } + @Override @TimedApplication public void toggleActivity(Client c, Activity a) { log.info("PARTICIPANT {}: trying to toggle activity {}", c, a); @@ -218,11 +215,11 @@ public class StreamProcessor implements IStreamProcessor { Set<String> closed = wasBroadcasting ? cleanWebCams(c, streams) : Set.of(); cm.update(c.restoreActivities(sd)); log.debug("User {}: has started broadcast", sd.getUid()); - kHandler.sendClient(sd.getSid(), newKurentoMsg() + kHandler.sendClient(sd.getSid(), kHandler.newKurentoMsg() .put("id", "broadcast") .put("stream", sd.toJson(true)) .put("cleanup", new JSONArray(closed)) - .put(PARAM_ICE, kHandler.getTurnServers(c, false))); + .put(PARAM_ICE, kHandler.getTurnServers(c))); } } } @@ -238,6 +235,7 @@ public class StreamProcessor implements IStreamProcessor { }); } + @Override public void rightsUpdated(Client c) { Optional<StreamDesc> osd = c.getScreenStream(); if (osd.isPresent() && !hasRightsToShare(c)) { @@ -288,6 +286,7 @@ public class StreamProcessor implements IStreamProcessor { } // Sharing + @Override public boolean hasRightsToShare(Client c) { if (!kHandler.isConnected()) { return false; @@ -298,6 +297,7 @@ public class StreamProcessor implements IStreamProcessor { && c.hasRight(Right.SHARE); } + @Override public boolean screenShareAllowed(Client c) { Room r = c.getRoom(); return hasRightsToShare(c) && !isSharing(r.getId()); @@ -322,7 +322,7 @@ public class StreamProcessor implements IStreamProcessor { private void startSharing(Client c, Optional<StreamDesc> osd, JSONObject msg, Activity a) { if (kHandler.isConnected() && c.getRoomId() != null) { - kHandler.getRoom(c.getRoomId()).startSharing(this, cm, c, osd, msg, a); + kHandler.getRoom(c.getRoomId()).startSharing(cm, c, osd, msg, a); } } @@ -334,7 +334,8 @@ public class StreamProcessor implements IStreamProcessor { * @param c client * @param uid the uid */ - void pauseSharing(Client c, String uid) { + @Override + public void pauseSharing(Client c, String uid) { if (!hasRightsToShare(c)) { return; } @@ -364,7 +365,8 @@ public class StreamProcessor implements IStreamProcessor { } } - StreamDesc doStopSharing(String sid, String uid) { + @Override + public StreamDesc doStopSharing(String sid, String uid) { return doStopSharing(getBySid(sid), uid); } @@ -385,6 +387,7 @@ public class StreamProcessor implements IStreamProcessor { return sd; } + @Override public boolean isSharing(Long roomId) { if (!kHandler.isConnected()) { return false; @@ -394,11 +397,13 @@ public class StreamProcessor implements IStreamProcessor { // Recording + @Override public boolean hasRightsToRecord(Client c) { Room r = c.getRoom(); return isRecordingsEnabled() && r != null && r.isAllowRecording() && c.hasRight(Right.MODERATOR); } + @Override public boolean recordingAllowed(Client c) { if (!kHandler.isConnected() || !isRecordingsEnabled()) { return false; @@ -407,6 +412,7 @@ public class StreamProcessor implements IStreamProcessor { return hasRightsToRecord(c) && !isRecording(r.getId()); } + @Override public void startRecording(Client c) { if (!kHandler.isConnected() || !hasRightsToRecord(c)) { return; @@ -414,6 +420,7 @@ public class StreamProcessor implements IStreamProcessor { kHandler.getRoom(c.getRoomId()).startRecording(c); } + @Override public void stopRecording(Client c) { if (!kHandler.isConnected() || !hasRightsToRecord(c)) { return; @@ -434,12 +441,14 @@ public class StreamProcessor implements IStreamProcessor { * @param rec * @return */ - boolean startConvertion(Recording rec) { + @Override + public boolean startConvertion(Recording rec) { IRecordingConverter conv = rec.isInterview() ? interviewConverter : recordingConverter; taskExecutor.execute(() -> conv.startConversion(rec)); return true; } + @Override public boolean isRecording(Long roomId) { if (!kHandler.isConnected()) { return false; @@ -447,7 +456,8 @@ public class StreamProcessor implements IStreamProcessor { return kHandler.getRoom(roomId).isRecording(); } - void remove(Client c) { + @Override + public void remove(Client c) { for (StreamDesc sd : c.getStreams()) { AbstractStream s = getByUid(sd.getUid()); if (s != null) { @@ -461,32 +471,38 @@ public class StreamProcessor implements IStreamProcessor { } } - void addStream(KStream stream) { + @Override + public void addStream(KStream stream) { streamByUid.put(stream.getUid(), stream); } + @Override public Collection<KStream> getStreams() { return streamByUid.values(); } - Stream<KStream> getByRoom(Long roomId) { + @Override + public Stream<KStream> getByRoom(Long roomId) { return streamByUid.values().stream() .filter(stream -> stream.getRoomId().equals(roomId)); } - Client getBySid(String sid) { + @Override + public Client getBySid(String sid) { return cm.getBySid(sid); } + @Override public boolean hasStream(String uid) { return streamByUid.get(uid) != null; } - KStream getByUid(String uid) { + @Override + public KStream getByUid(String uid) { return uid == null ? null : streamByUid.get(uid); } - KurentoHandler getHandler() { + IKurentoHandler getHandler() { return kHandler; } @@ -494,10 +510,6 @@ public class StreamProcessor implements IStreamProcessor { return cm; } - RecordingDao getRecordingDao() { - return recDao; - } - @Override public void release(AbstractStream stream, boolean releaseStream) { final String uid = stream.getUid(); @@ -528,8 +540,9 @@ public class StreamProcessor implements IStreamProcessor { } } - protected static JSONObject newStoppedMsg(StreamDesc sd) { - return newKurentoMsg() + @Override + public JSONObject newStoppedMsg(StreamDesc sd) { + return kHandler.newKurentoMsg() .put("id", "broadcastStopped") .put("uid", sd.getUid()); } diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessorActions.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessorActions.java index 8608485..d881e39 100644 --- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessorActions.java +++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessorActions.java @@ -20,7 +20,6 @@ package org.apache.openmeetings.core.remote; import static org.apache.openmeetings.core.remote.KurentoHandler.PARAM_CANDIDATE; -import static org.apache.openmeetings.core.remote.KurentoHandler.sendError; import org.apache.openmeetings.core.util.WebSocketHelper; import org.apache.openmeetings.db.entity.basic.Client; @@ -46,9 +45,9 @@ public class StreamProcessorActions { @Autowired private IClientManager cm; @Autowired - private KurentoHandler kHandler; + private IKurentoHandler kHandler; @Autowired - private StreamProcessor streamProcessor; + private IStreamProcessor streamProcessor; @TimedApplication protected void addIceCandidate(JSONObject msg) { @@ -120,8 +119,8 @@ public class StreamProcessorActions { }); } catch (KurentoServerException e) { sender.release(); - WebSocketHelper.sendClient(c, StreamProcessor.newStoppedMsg(sd)); - sendError(c, "Failed to start broadcast: " + e.getMessage()); + WebSocketHelper.sendClient(c, streamProcessor.newStoppedMsg(sd)); + kHandler.sendError(c, "Failed to start broadcast: " + e.getMessage()); log.error("Failed to start broadcast", e); } } diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/TestStreamProcessor.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/TestStreamProcessor.java index 499d772..bf3c5f1 100644 --- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/TestStreamProcessor.java +++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/TestStreamProcessor.java @@ -24,12 +24,18 @@ import static org.apache.openmeetings.core.remote.KurentoHandler.PARAM_CANDIDATE import static org.apache.openmeetings.core.remote.KurentoHandler.PARAM_ICE; import static org.apache.openmeetings.core.remote.KurentoHandler.TAG_MODE; +import java.util.Collection; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Stream; import org.apache.openmeetings.core.util.WebSocketHelper; +import org.apache.openmeetings.db.entity.basic.Client; +import org.apache.openmeetings.db.entity.basic.Client.Activity; +import org.apache.openmeetings.db.entity.basic.Client.StreamDesc; import org.apache.openmeetings.db.entity.basic.IWsClient; +import org.apache.openmeetings.db.entity.record.Recording; import org.kurento.client.IceCandidate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -41,10 +47,10 @@ class TestStreamProcessor implements IStreamProcessor { private final Map<String, KTestStream> streamByUid = new ConcurrentHashMap<>(); @Autowired - private KurentoHandler kHandler; + private IKurentoHandler kHandler; void onMessage(IWsClient c, final String cmdId, JSONObject msg) { - KTestStream user = getByUid(c.getUid()); + KTestStream user = getByUid1(c.getUid()); switch (cmdId) { case "wannaRecord": WebSocketHelper.sendClient(c, newTestKurentoMsg() @@ -84,16 +90,16 @@ class TestStreamProcessor implements IStreamProcessor { } } - private KTestStream getByUid(String uid) { + public KTestStream getByUid1(String uid) { return uid == null ? null : streamByUid.get(uid); } - static JSONObject newTestKurentoMsg() { - return KurentoHandler.newKurentoMsg().put(TAG_MODE, MODE_TEST); + JSONObject newTestKurentoMsg() { + return kHandler.newKurentoMsg().put(TAG_MODE, MODE_TEST); } void remove(IWsClient c) { - AbstractStream s = getByUid(c.getUid()); + AbstractStream s = getByUid1(c.getUid()); if (s != null) { s.release(); } @@ -112,4 +118,133 @@ class TestStreamProcessor implements IStreamProcessor { } streamByUid.clear(); } + + @Override + public void toggleActivity(Client c, Activity a) { + // TODO Auto-generated method stub + } + + @Override + public void rightsUpdated(Client c) { + // TODO Auto-generated method stub + } + + @Override + public boolean hasRightsToShare(Client c) { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean screenShareAllowed(Client c) { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean isSharing(Long roomId) { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean hasRightsToRecord(Client c) { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean recordingAllowed(Client c) { + // TODO Auto-generated method stub + return false; + } + + @Override + public void startRecording(Client c) { + // TODO Auto-generated method stub + } + + @Override + public void stopRecording(Client c) { + // TODO Auto-generated method stub + } + + @Override + public boolean isRecording(Long roomId) { + // TODO Auto-generated method stub + return false; + } + + @Override + public Collection<KStream> getStreams() { + // TODO Auto-generated method stub + return null; + } + + @Override + public boolean hasStream(String uid) { + // TODO Auto-generated method stub + return false; + } + + @Override + public void startBroadcast(KStream stream, StreamDesc sd, String sdpOffer, Runnable then) { + // TODO Auto-generated method stub + } + + @Override + public boolean startConvertion(Recording rec) { + // TODO Auto-generated method stub + return false; + } + + @Override + public void pauseSharing(Client c, String uid) { + // TODO Auto-generated method stub + } + + @Override + public Client getBySid(String sid) { + // TODO Auto-generated method stub + return null; + } + + @Override + public void addStream(KStream stream) { + // TODO Auto-generated method stub + } + + @Override + public StreamDesc doStopSharing(String sid, String uid) { + // TODO Auto-generated method stub + return null; + } + + @Override + public Stream<KStream> getByRoom(Long roomId) { + // TODO Auto-generated method stub + return null; + } + + @Override + public KStream getByUid(String uid) { + // TODO Auto-generated method stub + return null; + } + + @Override + public void onMessage(Client c, String cmdId, JSONObject msg) { + // TODO Auto-generated method stub + } + + @Override + public void remove(Client c) { + // TODO Auto-generated method stub + } + + @Override + public JSONObject newStoppedMsg(StreamDesc sd) { + // TODO Auto-generated method stub + return null; + } }