This is an automated email from the ASF dual-hosted git repository.

sebawagner pushed a commit to branch 
feature/OPENMEETINGS-2585-IKurentoHandler-class
in repository https://gitbox.apache.org/repos/asf/openmeetings.git

commit 9305adc7a8c10923a00d162d958136dd5242a983
Author: Sebastian Wagner <seba.wag...@gmail.com>
AuthorDate: Thu Mar 4 20:17:25 2021 +1300

    OPENMEETINGS-2585 Add IKurentoHandler class and IStreamHandler methods.
---
 .../openmeetings/core/remote/IKurentoHandler.java  |  82 ++++++++++++
 .../openmeetings/core/remote/IStreamProcessor.java |  60 +++++++++
 .../org/apache/openmeetings/core/remote/KRoom.java |  50 +++----
 .../apache/openmeetings/core/remote/KStream.java   |  18 +--
 .../openmeetings/core/remote/KTestStream.java      |  12 +-
 .../openmeetings/core/remote/KurentoHandler.java   |  55 +++++---
 .../openmeetings/core/remote/StreamProcessor.java  |  67 ++++++----
 .../core/remote/StreamProcessorActions.java        |   9 +-
 .../core/remote/TestStreamProcessor.java           | 147 ++++++++++++++++++++-
 9 files changed, 409 insertions(+), 91 deletions(-)

diff --git 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/IKurentoHandler.java
 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/IKurentoHandler.java
new file mode 100644
index 0000000..8adac94
--- /dev/null
+++ 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/IKurentoHandler.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License") +  you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.openmeetings.core.remote;
+
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.openmeetings.core.sip.SipManager;
+import org.apache.openmeetings.db.dao.record.RecordingChunkDao;
+import org.apache.openmeetings.db.entity.basic.Client;
+import org.apache.openmeetings.db.entity.basic.IWsClient;
+import org.apache.openmeetings.db.entity.basic.Client.StreamDesc;
+import org.apache.openmeetings.db.entity.room.Room;
+import org.kurento.client.Continuation;
+import org.kurento.client.MediaPipeline;
+
+import com.github.openjson.JSONArray;
+import com.github.openjson.JSONObject;
+
+public interface IKurentoHandler {
+
+       void init();
+
+       boolean isConnected();
+
+       void sendShareUpdated(StreamDesc sd);
+
+       void destroy();
+
+       void onMessage(IWsClient inClient, JSONObject msg);
+
+       JSONObject getRecordingUser(Long roomId);
+
+       void leaveRoom(Client c);
+
+       void sendClient(String sid, JSONObject msg);
+
+       void sendError(IWsClient c, String msg);
+
+       void remove(IWsClient c);
+
+       MediaPipeline createPipiline(Map<String, String> tags, 
Continuation<Void> continuation);
+
+       KRoom getRoom(Long roomId);
+
+       Collection<KRoom> getRooms();
+
+       void updateSipCount(Room r, long count);
+
+       JSONObject newKurentoMsg();
+
+       JSONArray getTurnServers(Client c);
+
+       JSONArray getTurnServers(Client c, final boolean test);
+
+       TestStreamProcessor getTestProcessor();
+
+       IStreamProcessor getStreamProcessor();
+
+       SipManager getSipManager();
+
+       RecordingChunkDao getChunkDao();
+
+       int getFlowoutTimeout();
+
+}
diff --git 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/IStreamProcessor.java
 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/IStreamProcessor.java
index f142285..a658243 100644
--- 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/IStreamProcessor.java
+++ 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/IStreamProcessor.java
@@ -18,7 +18,67 @@
  */
 package org.apache.openmeetings.core.remote;
 
+import java.util.Collection;
+import java.util.stream.Stream;
+
+import org.apache.openmeetings.db.entity.basic.Client;
+import org.apache.openmeetings.db.entity.basic.Client.Activity;
+import org.apache.openmeetings.db.entity.basic.Client.StreamDesc;
+import org.apache.openmeetings.db.entity.record.Recording;
+
+import com.github.openjson.JSONObject;
+
 public interface IStreamProcessor {
+
+       void toggleActivity(Client c, Activity a);
+
+       void rightsUpdated(Client c);
+
+       // Sharing
+       boolean hasRightsToShare(Client c);
+
+       Stream<KStream> getByRoom(Long roomId);
+
+       KStream getByUid(String uid);
+
+       void onMessage(Client c, final String cmdId, JSONObject msg);
+
+       void startBroadcast(KStream stream, StreamDesc sd, String sdpOffer, 
Runnable then);
+
+       boolean startConvertion(Recording rec);
+
+       void pauseSharing(Client c, String uid);
+
+       StreamDesc doStopSharing(String sid, String uid);
+
+       Client getBySid(String sid);
+
+       boolean screenShareAllowed(Client c);
+
+       boolean isSharing(Long roomId);
+
+       void addStream(KStream stream);
+
+       boolean hasRightsToRecord(Client c);
+
+       boolean recordingAllowed(Client c);
+
+       void startRecording(Client c);
+
+       void stopRecording(Client c);
+
+       boolean isRecording(Long roomId);
+
+       Collection<KStream> getStreams();
+
+       boolean hasStream(String uid);
+
+       void remove(Client c);
+
        void release(AbstractStream stream, boolean releaseStream);
+
        void destroy();
+
+       JSONObject newStoppedMsg(StreamDesc sd);
+
 }
diff --git 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java
 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java
index 3de644f..9b65c73 100644
--- 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java
+++ 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java
@@ -20,7 +20,6 @@ package org.apache.openmeetings.core.remote;
 
 import static java.util.UUID.randomUUID;
 import static org.apache.openmeetings.core.remote.KurentoHandler.PARAM_ICE;
-import static org.apache.openmeetings.core.remote.KurentoHandler.newKurentoMsg;
 import static 
org.apache.openmeetings.db.util.ApplicationHelper.ensureApplication;
 
 import java.util.Date;
@@ -31,6 +30,7 @@ import org.apache.commons.lang3.time.FastDateFormat;
 import org.apache.openmeetings.IApplication;
 import org.apache.openmeetings.core.util.WebSocketHelper;
 import org.apache.openmeetings.db.dao.record.RecordingChunkDao;
+import org.apache.openmeetings.db.dao.record.RecordingDao;
 import org.apache.openmeetings.db.entity.basic.Client;
 import org.apache.openmeetings.db.entity.basic.Client.Activity;
 import org.apache.openmeetings.db.entity.basic.Client.StreamDesc;
@@ -55,8 +55,11 @@ import com.github.openjson.JSONObject;
 public class KRoom {
        private static final Logger log = LoggerFactory.getLogger(KRoom.class);
 
-       private final StreamProcessor processor;
+       private final IKurentoHandler kHandler;
+       private final IStreamProcessor processor;
+       private final IClientManager cm;
        private final RecordingChunkDao chunkDao;
+       private final RecordingDao recDao;
        private final Room room;
        private final AtomicBoolean recordingStarted = new AtomicBoolean(false);
        private final AtomicBoolean sharingStarted = new AtomicBoolean(false);
@@ -65,9 +68,12 @@ public class KRoom {
        private JSONObject recordingUser = new JSONObject();
        private JSONObject sharingUser = new JSONObject();
 
-       public KRoom(KurentoHandler handler, Room r) {
+       public KRoom(IKurentoHandler handler, Room r, IClientManager cm, 
RecordingDao recDao) {
+               this.kHandler = handler;
+               this.cm = cm;
                this.processor = handler.getStreamProcessor();
                this.chunkDao = handler.getChunkDao();
+               this.recDao = recDao;
                this.room = r;
                log.info("ROOM {} has been created", room.getId());
        }
@@ -84,16 +90,16 @@ public class KRoom {
                return chunkDao;
        }
 
-       public KStream join(final StreamDesc sd, KurentoHandler kHandler) {
+       public KStream join(final StreamDesc sd, IKurentoHandler kHandler) {
                log.info("ROOM {}: join client {}, stream: {}", room.getId(), 
sd.getClient(), sd.getUid());
-               final KStream stream = new KStream(sd, this, kHandler);
+               final KStream stream = new KStream(sd, this, kHandler, 
this.processor);
                processor.addStream(stream);
                return stream;
        }
 
        public void onStopBroadcast(KStream stream) {
                processor.release(stream, true);
-               WebSocketHelper.sendAll(newKurentoMsg()
+               WebSocketHelper.sendAll(kHandler.newKurentoMsg()
                                .put("id", "broadcastStopped")
                                .put("uid", stream.getUid())
                                .toString()
@@ -144,11 +150,11 @@ public class KRoom {
                        Optional<StreamDesc> osd = c.getScreenStream();
                        if (osd.isPresent()) {
                                osd.get().addActivity(Activity.RECORD);
-                               processor.getClientManager().update(c);
+                               cm.update(c);
                                rec.setWidth(osd.get().getWidth());
                                rec.setHeight(osd.get().getHeight());
                        }
-                       rec = processor.getRecordingDao().update(rec);
+                       rec = recDao.update(rec);
                        // Receive recordingId
                        recordingId = rec.getId();
                        
processor.getByRoom(room.getId()).forEach(KStream::startRecord);
@@ -163,9 +169,9 @@ public class KRoom {
                if (recordingStarted.compareAndSet(true, false)) {
                        log.debug("##REC:: recording in room {} is stopping {} 
::", room.getId(), recordingId);
                        
processor.getByRoom(room.getId()).forEach(KStream::stopRecord);
-                       Recording rec = 
processor.getRecordingDao().get(recordingId);
+                       Recording rec = recDao.get(recordingId);
                        rec.setRecordEnd(new Date());
-                       rec = processor.getRecordingDao().update(rec);
+                       rec = recDao.update(rec);
                        recordingUser = new JSONObject();
                        recordingId = null;
 
@@ -178,8 +184,8 @@ public class KRoom {
                                Optional<StreamDesc> osd = c.getScreenStream();
                                if (osd.isPresent()) {
                                        
osd.get().removeActivity(Activity.RECORD);
-                                       processor.getClientManager().update(c);
-                                       
processor.getHandler().sendShareUpdated(osd.get());
+                                       cm.update(c);
+                                       kHandler.sendShareUpdated(osd.get());
                                }
                        }
                        // Send notification to all users that the recording 
has been started
@@ -203,29 +209,28 @@ public class KRoom {
                return new JSONObject(sharingUser.toString());
        }
 
-       public void startSharing(StreamProcessor processor, IClientManager cm, 
Client c, Optional<StreamDesc> osd, JSONObject msg, Activity a) {
+       public void startSharing(IClientManager cm, Client c, 
Optional<StreamDesc> osd, JSONObject msg, Activity a) {
                StreamDesc sd;
-               KurentoHandler h = processor.getHandler();
                if (sharingStarted.compareAndSet(false, true)) {
                        sharingUser.put("sid", c.getSid());
                        sd = c.addStream(StreamType.SCREEN, a);
                        cm.update(c);
                        log.debug("Stream.UID {}: sharing has been started, 
activity: {}", sd.getUid(), a);
-                       h.sendClient(sd.getSid(), newKurentoMsg()
+                       kHandler.sendClient(sd.getSid(), 
kHandler.newKurentoMsg()
                                        .put("id", "broadcast")
                                        .put("stream", sd.toJson()
                                                        .put("shareType", 
msg.getString("shareType"))
                                                        .put("fps", 
msg.getString("fps")))
-                                       .put(PARAM_ICE, h.getTurnServers(c)));
+                                       .put(PARAM_ICE, 
kHandler.getTurnServers(c)));
                } else if (osd.isPresent() && !osd.get().hasActivity(a)) {
                        sd = osd.get();
                        sd.addActivity(a);
                        cm.update(c);
-                       h.sendShareUpdated(sd);
+                       kHandler.sendShareUpdated(sd);
                        WebSocketHelper.sendRoom(new 
TextRoomMessage(c.getRoomId(), c, RoomMessage.Type.RIGHT_UPDATED, c.getUid()));
-                       WebSocketHelper.sendRoomOthers(room.getId(), 
c.getUid(), newKurentoMsg()
+                       WebSocketHelper.sendRoomOthers(room.getId(), 
c.getUid(), kHandler.newKurentoMsg()
                                        .put("id", "newStream")
-                                       .put(PARAM_ICE, 
processor.getHandler().getTurnServers(c))
+                                       .put(PARAM_ICE, 
kHandler.getTurnServers(c))
                                        .put("stream", sd.toJson()));
                }
        }
@@ -245,17 +250,16 @@ public class KRoom {
                if (count != sipCount) {
                        processor.getByRoom(room.getId()).forEach(stream -> 
stream.addSipProcessor(count));
                        if (sipCount == 0) {
-                               processor.getClientManager()
-                                       .streamByRoom(room.getId())
+                               cm.streamByRoom(room.getId())
                                        .filter(Client::isSip)
                                        .findAny()
                                        .ifPresent(c -> {
                                                StreamDesc sd = 
c.addStream(StreamType.WEBCAM, Activity.AUDIO);
                                                sd.setWidth(120).setHeight(90);
                                                c.restoreActivities(sd);
-                                               KStream stream = join(sd, 
processor.getHandler());
+                                               KStream stream = join(sd, 
kHandler);
                                                stream.startBroadcast(sd, "", 
() -> {});
-                                               
processor.getClientManager().update(c);
+                                               cm.update(c);
                                        });
                        }
                        sipCount = count;
diff --git 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java
 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java
index aaef0ca..c1ccec3 100644
--- 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java
+++ 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java
@@ -27,8 +27,6 @@ import static 
org.apache.openmeetings.core.remote.KurentoHandler.PARAM_CANDIDATE
 import static org.apache.openmeetings.core.remote.KurentoHandler.PARAM_ICE;
 import static org.apache.openmeetings.core.remote.KurentoHandler.TAG_ROOM;
 import static 
org.apache.openmeetings.core.remote.KurentoHandler.TAG_STREAM_UID;
-import static 
org.apache.openmeetings.core.remote.KurentoHandler.getFlowoutTimeout;
-import static org.apache.openmeetings.core.remote.KurentoHandler.newKurentoMsg;
 import static org.apache.openmeetings.util.OmFileHelper.getRecUri;
 import static org.apache.openmeetings.util.OmFileHelper.getRecordingChunk;
 
@@ -75,7 +73,8 @@ import com.github.openjson.JSONObject;
 public class KStream extends AbstractStream implements ISipCallbacks {
        private static final Logger log = 
LoggerFactory.getLogger(KStream.class);
 
-       private final KurentoHandler kHandler;
+       private final IKurentoHandler kHandler;
+       private final IStreamProcessor processor;
        private final KRoom kRoom;
        private final Date connectedSince;
        private final StreamType streamType;
@@ -96,9 +95,10 @@ public class KStream extends AbstractStream implements 
ISipCallbacks {
        private boolean hasScreen;
        private boolean sipClient;
 
-       public KStream(final StreamDesc sd, KRoom kRoom, KurentoHandler 
kHandler) {
+       public KStream(final StreamDesc sd, KRoom kRoom, IKurentoHandler 
kHandler, IStreamProcessor processor) {
                super(sd.getSid(), sd.getUid());
                this.kRoom = kRoom;
+               this.processor = processor;
                streamType = sd.getType();
                this.connectedSince = new Date();
                this.kHandler = kHandler;
@@ -189,11 +189,11 @@ public class KStream extends AbstractStream implements 
ISipCallbacks {
                                flowoutFuture = Optional.of(new 
CompletableFuture<>().completeAsync(() -> {
                                        log.warn("KStream will be dropped {}, 
sid {}, uid {}", sd, sid, uid);
                                        if (StreamType.SCREEN == streamType) {
-                                               
kHandler.getStreamProcessor().doStopSharing(sid, uid);
+                                               processor.doStopSharing(sid, 
uid);
                                        }
                                        stopBroadcast();
                                        return null;
-                               }, delayedExecutor(getFlowoutTimeout(), 
TimeUnit.SECONDS)));
+                               }, 
delayedExecutor(kHandler.getFlowoutTimeout(), TimeUnit.SECONDS)));
                        } else {
                                dropFlowoutFuture();
                        }
@@ -213,7 +213,7 @@ public class KStream extends AbstractStream implements 
ISipCallbacks {
                Client c = sd.getClient();
                WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), c, 
RoomMessage.Type.RIGHT_UPDATED, c.getUid()));
                if (hasAudio || hasVideo || hasScreen) {
-                       WebSocketHelper.sendRoomOthers(getRoomId(), c.getUid(), 
newKurentoMsg()
+                       WebSocketHelper.sendRoomOthers(getRoomId(), c.getUid(), 
kHandler.newKurentoMsg()
                                        .put("id", "newStream")
                                        .put(PARAM_ICE, 
kHandler.getTurnServers(c))
                                        .put("stream", sd.toJson()));
@@ -252,7 +252,7 @@ public class KStream extends AbstractStream implements 
ISipCallbacks {
                        ((WebRtcEndpoint)endpoint).gatherCandidates(); // this 
one might throw Exception
                }
                log.trace("USER {}: SdpAnswer is {}", this.uid, sdpAnswer);
-               kHandler.sendClient(sid, newKurentoMsg()
+               kHandler.sendClient(sid, kHandler.newKurentoMsg()
                                .put("id", "videoResponse")
                                .put("uid", this.uid)
                                .put("sdpAnswer", sdpAnswer));
@@ -311,7 +311,7 @@ public class KStream extends AbstractStream implements 
ISipCallbacks {
                reApplyIceCandiates(endpoint, recv);
 
                endpoint.addIceCandidateFoundListener(evt -> 
kHandler.sendClient(sid
-                               , newKurentoMsg()
+                               , kHandler.newKurentoMsg()
                                        .put("id", "iceCandidate")
                                        .put("uid", KStream.this.uid)
                                        .put(PARAM_CANDIDATE, 
convert(JsonUtils.toJsonObject(evt.getCandidate()))))
diff --git 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KTestStream.java
 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KTestStream.java
index 741c1f6..49d411a 100644
--- 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KTestStream.java
+++ 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KTestStream.java
@@ -23,8 +23,6 @@ import static 
org.apache.openmeetings.core.remote.KurentoHandler.MODE_TEST;
 import static 
org.apache.openmeetings.core.remote.KurentoHandler.PARAM_CANDIDATE;
 import static org.apache.openmeetings.core.remote.KurentoHandler.TAG_MODE;
 import static org.apache.openmeetings.core.remote.KurentoHandler.TAG_ROOM;
-import static org.apache.openmeetings.core.remote.KurentoHandler.sendError;
-import static 
org.apache.openmeetings.core.remote.TestStreamProcessor.newTestKurentoMsg;
 import static org.apache.openmeetings.util.OmFileHelper.EXTENSION_WEBM;
 import static org.apache.openmeetings.util.OmFileHelper.TEST_SETUP_PREFIX;
 import static org.apache.openmeetings.util.OmFileHelper.getStreamsDir;
@@ -55,7 +53,7 @@ import com.github.openjson.JSONObject;
 public class KTestStream extends AbstractStream {
        private static final Logger log = 
LoggerFactory.getLogger(KTestStream.class);
        private static final Map<String, String> TAGS = Map.of(TAG_MODE, 
MODE_TEST, TAG_ROOM, MODE_TEST);
-       private final KurentoHandler kHandler;
+       private final IKurentoHandler kHandler;
        private MediaPipeline pipeline;
        private WebRtcEndpoint webRtcEndpoint;
        private PlayerEndpoint player;
@@ -65,12 +63,16 @@ public class KTestStream extends AbstractStream {
        private ScheduledFuture<?> recHandle;
        private int recTime;
 
-       public KTestStream(IWsClient c, JSONObject msg, KurentoHandler 
kHandler) {
+       public KTestStream(IWsClient c, JSONObject msg, IKurentoHandler 
kHandler) {
                super(null, c.getUid());
                this.kHandler = kHandler;
                createPipeline(() -> startTestRecording(c, msg));
        }
 
+       JSONObject newTestKurentoMsg() {
+               return kHandler.newKurentoMsg().put(TAG_MODE, MODE_TEST);
+       }
+
        private void startTestRecording(IWsClient c, JSONObject msg) {
                webRtcEndpoint = createWebRtcEndpoint(pipeline, null);
                webRtcEndpoint.connect(webRtcEndpoint);
@@ -126,7 +128,7 @@ public class KTestStream extends AbstractStream {
 
                        @Override
                        public void onError(Throwable cause) throws Exception {
-                               sendError(c, "Failed to start recording");
+                               kHandler.sendError(c, "Failed to start 
recording");
                                log.error("Failed to start recording", cause);
                        }
                });
diff --git 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
index 1e8ebb4..597f831 100644
--- 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
+++ 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
@@ -44,6 +44,7 @@ import javax.crypto.spec.SecretKeySpec;
 import org.apache.openmeetings.core.sip.SipManager;
 import org.apache.openmeetings.core.util.WebSocketHelper;
 import org.apache.openmeetings.db.dao.record.RecordingChunkDao;
+import org.apache.openmeetings.db.dao.record.RecordingDao;
 import org.apache.openmeetings.db.dao.room.RoomDao;
 import org.apache.openmeetings.db.entity.basic.Client;
 import org.apache.openmeetings.db.entity.basic.Client.Activity;
@@ -74,13 +75,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Component;
 
 import com.github.openjson.JSONArray;
 import com.github.openjson.JSONObject;
 
-@Component
-public class KurentoHandler {
+public class KurentoHandler implements IKurentoHandler {
        private static final Logger log = 
LoggerFactory.getLogger(KurentoHandler.class);
        public static final String PARAM_ICE = "iceServers";
        public static final String PARAM_CANDIDATE = "candidate";
@@ -123,17 +122,20 @@ public class KurentoHandler {
        @Autowired
        private IClientManager cm;
        @Autowired
+       private RecordingDao recDao;
+       @Autowired
        private RoomDao roomDao;
        @Autowired
        private RecordingChunkDao chunkDao;
        @Autowired
        private TestStreamProcessor testProcessor;
        @Autowired
-       private StreamProcessor streamProcessor;
+       private IStreamProcessor streamProcessor;
        @Autowired
        private SipManager sipManager;
 
-       boolean isConnected() {
+       @Override
+       public boolean isConnected() {
                boolean connctd = connected.get() && client != null && 
!client.isClosed();
                if (!connctd) {
                        log.warn(WARN_NO_KURENTO);
@@ -141,6 +143,7 @@ public class KurentoHandler {
                return connctd;
        }
 
+       @Override
        @PostConstruct
        public void init() {
                check = () -> {
@@ -196,6 +199,7 @@ public class KurentoHandler {
                kmsRecheckScheduler.scheduleAtFixedRate(check, 0L, 
checkTimeout, MILLISECONDS);
        }
 
+       @Override
        @PreDestroy
        public void destroy() {
                clean();
@@ -233,6 +237,7 @@ public class KurentoHandler {
                return client.beginTransaction();
        }
 
+       @Override
        public void onMessage(IWsClient inClient, JSONObject msg) {
                if (!isConnected()) {
                        sendError(inClient, "Multimedia server is 
inaccessible");
@@ -252,6 +257,7 @@ public class KurentoHandler {
                }
        }
 
+       @Override
        public JSONObject getRecordingUser(Long roomId) {
                if (!isConnected()) {
                        return new JSONObject();
@@ -259,6 +265,7 @@ public class KurentoHandler {
                return getRoom(roomId).getRecordingUser();
        }
 
+       @Override
        public void leaveRoom(Client c) {
                remove(c);
                WebSocketHelper.sendAll(newKurentoMsg()
@@ -268,23 +275,27 @@ public class KurentoHandler {
                        );
        }
 
-       void sendShareUpdated(StreamDesc sd) {
+       @Override
+       public void sendShareUpdated(StreamDesc sd) {
                sendClient(sd.getSid(), newKurentoMsg()
                                .put("id", "shareUpdated")
                                .put("stream", sd.toJson())
                        );
        }
 
+       @Override
        public void sendClient(String sid, JSONObject msg) {
                WebSocketHelper.sendClient(cm.getBySid(sid), msg);
        }
 
-       public static void sendError(IWsClient c, String msg) {
+       @Override
+       public void sendError(IWsClient c, String msg) {
                WebSocketHelper.sendClient(c, newKurentoMsg()
                                .put("id", "error")
                                .put("message", msg));
        }
 
+       @Override
        public void remove(IWsClient c) {
                if (!isConnected() || c == null) {
                        return;
@@ -296,7 +307,8 @@ public class KurentoHandler {
                streamProcessor.remove((Client)c);
        }
 
-       MediaPipeline createPipiline(Map<String, String> tags, 
Continuation<Void> continuation) {
+       @Override
+       public MediaPipeline createPipiline(Map<String, String> tags, 
Continuation<Void> continuation) {
                Transaction t = beginTransaction();
                MediaPipeline pipe = client.createMediaPipeline(t);
                pipe.addTag(t, TAG_KUID, kuid);
@@ -305,23 +317,27 @@ public class KurentoHandler {
                return pipe;
        }
 
-       KRoom getRoom(Long roomId) {
+       @Override
+       public KRoom getRoom(Long roomId) {
                return rooms.computeIfAbsent(roomId, k -> {
                        log.debug("Room {} does not exist. Will create now!", 
roomId);
                        Room r = roomDao.get(roomId);
-                       return new KRoom(this, r);
+                       return new KRoom(this, r, cm, recDao);
                });
        }
 
+       @Override
        public Collection<KRoom> getRooms() {
                return rooms.values();
        }
 
+       @Override
        public void updateSipCount(Room r, long count) {
                getRoom(r.getId()).updateSipCount(count);
        }
 
-       static JSONObject newKurentoMsg() {
+       @Override
+       public JSONObject newKurentoMsg() {
                return new JSONObject().put("type", KURENTO_TYPE);
        }
 
@@ -343,11 +359,13 @@ public class KurentoHandler {
                return r;
        }
 
+       @Override
        public JSONArray getTurnServers(Client c) {
                return getTurnServers(c, false);
        }
 
-       JSONArray getTurnServers(Client c, final boolean test) {
+       @Override
+       public JSONArray getTurnServers(Client c, final boolean test) {
                JSONArray arr = new JSONArray();
                if (!Strings.isEmpty(turnUrl)) {
                        try {
@@ -397,23 +415,28 @@ public class KurentoHandler {
                return kuid;
        }
 
+       @Override
        public TestStreamProcessor getTestProcessor() {
                return testProcessor;
        }
 
-       StreamProcessor getStreamProcessor() {
+       @Override
+       public IStreamProcessor getStreamProcessor() {
                return streamProcessor;
        }
 
-       SipManager getSipManager() {
+       @Override
+       public SipManager getSipManager() {
                return sipManager;
        }
 
-       RecordingChunkDao getChunkDao() {
+       @Override
+       public RecordingChunkDao getChunkDao() {
                return chunkDao;
        }
 
-       static int getFlowoutTimeout() {
+       @Override
+       public int getFlowoutTimeout() {
                return flowoutTimeout;
        }
 
diff --git 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java
 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java
index 3771c0c..368758d 100644
--- 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java
+++ 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java
@@ -21,7 +21,6 @@ package org.apache.openmeetings.core.remote;
 
 import static org.apache.openmeetings.core.remote.KurentoHandler.PARAM_ICE;
 import static 
org.apache.openmeetings.core.remote.KurentoHandler.activityAllowed;
-import static org.apache.openmeetings.core.remote.KurentoHandler.newKurentoMsg;
 import static 
org.apache.openmeetings.util.OpenmeetingsVariables.isRecordingsEnabled;
 
 import java.util.Collection;
@@ -37,7 +36,6 @@ import 
org.apache.openmeetings.core.converter.IRecordingConverter;
 import org.apache.openmeetings.core.converter.InterviewConverter;
 import org.apache.openmeetings.core.converter.RecordingConverter;
 import org.apache.openmeetings.core.util.WebSocketHelper;
-import org.apache.openmeetings.db.dao.record.RecordingDao;
 import org.apache.openmeetings.db.entity.basic.Client;
 import org.apache.openmeetings.db.entity.basic.Client.Activity;
 import org.apache.openmeetings.db.entity.basic.Client.StreamDesc;
@@ -54,12 +52,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.core.task.TaskExecutor;
-import org.springframework.stereotype.Component;
 
 import com.github.openjson.JSONArray;
 import com.github.openjson.JSONObject;
 
-@Component
 public class StreamProcessor implements IStreamProcessor {
        private static final Logger log = 
LoggerFactory.getLogger(StreamProcessor.class);
        /**
@@ -70,9 +66,7 @@ public class StreamProcessor implements IStreamProcessor {
        @Autowired
        private IClientManager cm;
        @Autowired
-       private RecordingDao recDao;
-       @Autowired
-       private KurentoHandler kHandler;
+       private IKurentoHandler kHandler;
        @Autowired
        private TaskExecutor taskExecutor;
        @Autowired
@@ -83,7 +77,8 @@ public class StreamProcessor implements IStreamProcessor {
        private StreamProcessorActions streamProcessorActions;
 
        @TimedApplication
-       void onMessage(Client c, final String cmdId, JSONObject msg) {
+       @Override
+       public void onMessage(Client c, final String cmdId, JSONObject msg) {
                final String uid = msg.optString("uid");
                StreamDesc sd;
                Optional<StreamDesc> osd;
@@ -163,7 +158,8 @@ public class StreamProcessor implements IStreamProcessor {
         * @param then steps need to be done after broadcast is started
         * @return the current KStream
         */
-       void startBroadcast(KStream stream, StreamDesc sd, String sdpOffer, 
Runnable then) {
+       @Override
+       public void startBroadcast(KStream stream, StreamDesc sd, String 
sdpOffer, Runnable then) {
                stream.startBroadcast(sd, sdpOffer, then);
        }
 
@@ -186,6 +182,7 @@ public class StreamProcessor implements IStreamProcessor {
                return closed;
        }
 
+       @Override
        @TimedApplication
        public void toggleActivity(Client c, Activity a) {
                log.info("PARTICIPANT {}: trying to toggle activity {}", c, a);
@@ -218,11 +215,11 @@ public class StreamProcessor implements IStreamProcessor {
                                Set<String> closed = wasBroadcasting ? 
cleanWebCams(c, streams) : Set.of();
                                cm.update(c.restoreActivities(sd));
                                log.debug("User {}: has started broadcast", 
sd.getUid());
-                               kHandler.sendClient(sd.getSid(), newKurentoMsg()
+                               kHandler.sendClient(sd.getSid(), 
kHandler.newKurentoMsg()
                                                .put("id", "broadcast")
                                                .put("stream", sd.toJson(true))
                                                .put("cleanup", new 
JSONArray(closed))
-                                               .put(PARAM_ICE, 
kHandler.getTurnServers(c, false)));
+                                               .put(PARAM_ICE, 
kHandler.getTurnServers(c)));
                        }
                }
        }
@@ -238,6 +235,7 @@ public class StreamProcessor implements IStreamProcessor {
                        });
        }
 
+       @Override
        public void rightsUpdated(Client c) {
                Optional<StreamDesc> osd = c.getScreenStream();
                if (osd.isPresent() && !hasRightsToShare(c)) {
@@ -288,6 +286,7 @@ public class StreamProcessor implements IStreamProcessor {
        }
 
        // Sharing
+       @Override
        public boolean hasRightsToShare(Client c) {
                if (!kHandler.isConnected()) {
                        return false;
@@ -298,6 +297,7 @@ public class StreamProcessor implements IStreamProcessor {
                                && c.hasRight(Right.SHARE);
        }
 
+       @Override
        public boolean screenShareAllowed(Client c) {
                Room r = c.getRoom();
                return hasRightsToShare(c) && !isSharing(r.getId());
@@ -322,7 +322,7 @@ public class StreamProcessor implements IStreamProcessor {
 
        private void startSharing(Client c, Optional<StreamDesc> osd, 
JSONObject msg, Activity a) {
                if (kHandler.isConnected() && c.getRoomId() != null) {
-                       kHandler.getRoom(c.getRoomId()).startSharing(this, cm, 
c, osd, msg, a);
+                       kHandler.getRoom(c.getRoomId()).startSharing(cm, c, 
osd, msg, a);
                }
        }
 
@@ -334,7 +334,8 @@ public class StreamProcessor implements IStreamProcessor {
         * @param c client
         * @param uid the uid
         */
-       void pauseSharing(Client c, String uid) {
+       @Override
+       public void pauseSharing(Client c, String uid) {
                if (!hasRightsToShare(c)) {
                        return;
                }
@@ -364,7 +365,8 @@ public class StreamProcessor implements IStreamProcessor {
                }
        }
 
-       StreamDesc doStopSharing(String sid, String uid) {
+       @Override
+       public StreamDesc doStopSharing(String sid, String uid) {
                return doStopSharing(getBySid(sid), uid);
        }
 
@@ -385,6 +387,7 @@ public class StreamProcessor implements IStreamProcessor {
                return sd;
        }
 
+       @Override
        public boolean isSharing(Long roomId) {
                if (!kHandler.isConnected()) {
                        return false;
@@ -394,11 +397,13 @@ public class StreamProcessor implements IStreamProcessor {
 
        // Recording
 
+       @Override
        public boolean hasRightsToRecord(Client c) {
                Room r = c.getRoom();
                return isRecordingsEnabled() && r != null && 
r.isAllowRecording() && c.hasRight(Right.MODERATOR);
        }
 
+       @Override
        public boolean recordingAllowed(Client c) {
                if (!kHandler.isConnected() || !isRecordingsEnabled()) {
                        return false;
@@ -407,6 +412,7 @@ public class StreamProcessor implements IStreamProcessor {
                return hasRightsToRecord(c) && !isRecording(r.getId());
        }
 
+       @Override
        public void startRecording(Client c) {
                if (!kHandler.isConnected() || !hasRightsToRecord(c)) {
                        return;
@@ -414,6 +420,7 @@ public class StreamProcessor implements IStreamProcessor {
                kHandler.getRoom(c.getRoomId()).startRecording(c);
        }
 
+       @Override
        public void stopRecording(Client c) {
                if (!kHandler.isConnected() || !hasRightsToRecord(c)) {
                        return;
@@ -434,12 +441,14 @@ public class StreamProcessor implements IStreamProcessor {
         * @param rec
         * @return
         */
-       boolean startConvertion(Recording rec) {
+       @Override
+       public boolean startConvertion(Recording rec) {
                IRecordingConverter conv = rec.isInterview() ? 
interviewConverter : recordingConverter;
                taskExecutor.execute(() -> conv.startConversion(rec));
                return true;
        }
 
+       @Override
        public boolean isRecording(Long roomId) {
                if (!kHandler.isConnected()) {
                        return false;
@@ -447,7 +456,8 @@ public class StreamProcessor implements IStreamProcessor {
                return kHandler.getRoom(roomId).isRecording();
        }
 
-       void remove(Client c) {
+       @Override
+       public void remove(Client c) {
                for (StreamDesc sd : c.getStreams()) {
                        AbstractStream s = getByUid(sd.getUid());
                        if (s != null) {
@@ -461,32 +471,38 @@ public class StreamProcessor implements IStreamProcessor {
                }
        }
 
-       void addStream(KStream stream) {
+       @Override
+       public void addStream(KStream stream) {
                streamByUid.put(stream.getUid(), stream);
        }
 
+       @Override
        public Collection<KStream> getStreams() {
                return streamByUid.values();
        }
 
-       Stream<KStream> getByRoom(Long roomId) {
+       @Override
+       public Stream<KStream> getByRoom(Long roomId) {
                return streamByUid.values().stream()
                                .filter(stream -> 
stream.getRoomId().equals(roomId));
        }
 
-       Client getBySid(String sid) {
+       @Override
+       public Client getBySid(String sid) {
                return cm.getBySid(sid);
        }
 
+       @Override
        public boolean hasStream(String uid) {
                return streamByUid.get(uid) != null;
        }
 
-       KStream getByUid(String uid) {
+       @Override
+       public KStream getByUid(String uid) {
                return uid == null ? null : streamByUid.get(uid);
        }
 
-       KurentoHandler getHandler() {
+       IKurentoHandler getHandler() {
                return kHandler;
        }
 
@@ -494,10 +510,6 @@ public class StreamProcessor implements IStreamProcessor {
                return cm;
        }
 
-       RecordingDao getRecordingDao() {
-               return recDao;
-       }
-
        @Override
        public void release(AbstractStream stream, boolean releaseStream) {
                final String uid = stream.getUid();
@@ -528,8 +540,9 @@ public class StreamProcessor implements IStreamProcessor {
                }
        }
 
-       protected static JSONObject newStoppedMsg(StreamDesc sd) {
-               return newKurentoMsg()
+       @Override
+       public JSONObject newStoppedMsg(StreamDesc sd) {
+               return kHandler.newKurentoMsg()
                                .put("id", "broadcastStopped")
                                .put("uid", sd.getUid());
        }
diff --git 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessorActions.java
 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessorActions.java
index 8608485..d881e39 100644
--- 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessorActions.java
+++ 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessorActions.java
@@ -20,7 +20,6 @@
 package org.apache.openmeetings.core.remote;
 
 import static 
org.apache.openmeetings.core.remote.KurentoHandler.PARAM_CANDIDATE;
-import static org.apache.openmeetings.core.remote.KurentoHandler.sendError;
 
 import org.apache.openmeetings.core.util.WebSocketHelper;
 import org.apache.openmeetings.db.entity.basic.Client;
@@ -46,9 +45,9 @@ public class StreamProcessorActions {
        @Autowired
        private IClientManager cm;
        @Autowired
-       private KurentoHandler kHandler;
+       private IKurentoHandler kHandler;
        @Autowired
-       private StreamProcessor streamProcessor;
+       private IStreamProcessor streamProcessor;
 
        @TimedApplication
        protected void addIceCandidate(JSONObject msg) {
@@ -120,8 +119,8 @@ public class StreamProcessorActions {
                        });
                } catch (KurentoServerException e) {
                        sender.release();
-                       WebSocketHelper.sendClient(c, 
StreamProcessor.newStoppedMsg(sd));
-                       sendError(c, "Failed to start broadcast: " + 
e.getMessage());
+                       WebSocketHelper.sendClient(c, 
streamProcessor.newStoppedMsg(sd));
+                       kHandler.sendError(c, "Failed to start broadcast: " + 
e.getMessage());
                        log.error("Failed to start broadcast", e);
                }
        }
diff --git 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/TestStreamProcessor.java
 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/TestStreamProcessor.java
index 499d772..bf3c5f1 100644
--- 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/TestStreamProcessor.java
+++ 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/TestStreamProcessor.java
@@ -24,12 +24,18 @@ import static 
org.apache.openmeetings.core.remote.KurentoHandler.PARAM_CANDIDATE
 import static org.apache.openmeetings.core.remote.KurentoHandler.PARAM_ICE;
 import static org.apache.openmeetings.core.remote.KurentoHandler.TAG_MODE;
 
+import java.util.Collection;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Stream;
 
 import org.apache.openmeetings.core.util.WebSocketHelper;
+import org.apache.openmeetings.db.entity.basic.Client;
+import org.apache.openmeetings.db.entity.basic.Client.Activity;
+import org.apache.openmeetings.db.entity.basic.Client.StreamDesc;
 import org.apache.openmeetings.db.entity.basic.IWsClient;
+import org.apache.openmeetings.db.entity.record.Recording;
 import org.kurento.client.IceCandidate;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
@@ -41,10 +47,10 @@ class TestStreamProcessor implements IStreamProcessor {
        private final Map<String, KTestStream> streamByUid = new 
ConcurrentHashMap<>();
 
        @Autowired
-       private KurentoHandler kHandler;
+       private IKurentoHandler kHandler;
 
        void onMessage(IWsClient c, final String cmdId, JSONObject msg) {
-               KTestStream user = getByUid(c.getUid());
+               KTestStream user = getByUid1(c.getUid());
                switch (cmdId) {
                        case "wannaRecord":
                                WebSocketHelper.sendClient(c, 
newTestKurentoMsg()
@@ -84,16 +90,16 @@ class TestStreamProcessor implements IStreamProcessor {
                }
        }
 
-       private KTestStream getByUid(String uid) {
+       public KTestStream getByUid1(String uid) {
                return uid == null ? null : streamByUid.get(uid);
        }
 
-       static JSONObject newTestKurentoMsg() {
-               return KurentoHandler.newKurentoMsg().put(TAG_MODE, MODE_TEST);
+       JSONObject newTestKurentoMsg() {
+               return kHandler.newKurentoMsg().put(TAG_MODE, MODE_TEST);
        }
 
        void remove(IWsClient c) {
-               AbstractStream s = getByUid(c.getUid());
+               AbstractStream s = getByUid1(c.getUid());
                if (s != null) {
                        s.release();
                }
@@ -112,4 +118,133 @@ class TestStreamProcessor implements IStreamProcessor {
                }
                streamByUid.clear();
        }
+
+       @Override
+       public void toggleActivity(Client c, Activity a) {
+               // TODO Auto-generated method stub
+       }
+
+       @Override
+       public void rightsUpdated(Client c) {
+               // TODO Auto-generated method stub
+       }
+
+       @Override
+       public boolean hasRightsToShare(Client c) {
+               // TODO Auto-generated method stub
+               return false;
+       }
+
+       @Override
+       public boolean screenShareAllowed(Client c) {
+               // TODO Auto-generated method stub
+               return false;
+       }
+
+       @Override
+       public boolean isSharing(Long roomId) {
+               // TODO Auto-generated method stub
+               return false;
+       }
+
+       @Override
+       public boolean hasRightsToRecord(Client c) {
+               // TODO Auto-generated method stub
+               return false;
+       }
+
+       @Override
+       public boolean recordingAllowed(Client c) {
+               // TODO Auto-generated method stub
+               return false;
+       }
+
+       @Override
+       public void startRecording(Client c) {
+               // TODO Auto-generated method stub
+       }
+
+       @Override
+       public void stopRecording(Client c) {
+               // TODO Auto-generated method stub
+       }
+
+       @Override
+       public boolean isRecording(Long roomId) {
+               // TODO Auto-generated method stub
+               return false;
+       }
+
+       @Override
+       public Collection<KStream> getStreams() {
+               // TODO Auto-generated method stub
+               return null;
+       }
+
+       @Override
+       public boolean hasStream(String uid) {
+               // TODO Auto-generated method stub
+               return false;
+       }
+
+       @Override
+       public void startBroadcast(KStream stream, StreamDesc sd, String 
sdpOffer, Runnable then) {
+               // TODO Auto-generated method stub
+       }
+
+       @Override
+       public boolean startConvertion(Recording rec) {
+               // TODO Auto-generated method stub
+               return false;
+       }
+
+       @Override
+       public void pauseSharing(Client c, String uid) {
+               // TODO Auto-generated method stub
+       }
+
+       @Override
+       public Client getBySid(String sid) {
+               // TODO Auto-generated method stub
+               return null;
+       }
+
+       @Override
+       public void addStream(KStream stream) {
+               // TODO Auto-generated method stub
+       }
+
+       @Override
+       public StreamDesc doStopSharing(String sid, String uid) {
+               // TODO Auto-generated method stub
+               return null;
+       }
+
+       @Override
+       public Stream<KStream> getByRoom(Long roomId) {
+               // TODO Auto-generated method stub
+               return null;
+       }
+
+       @Override
+       public KStream getByUid(String uid) {
+               // TODO Auto-generated method stub
+               return null;
+       }
+
+       @Override
+       public void onMessage(Client c, String cmdId, JSONObject msg) {
+               // TODO Auto-generated method stub
+       }
+
+       @Override
+       public void remove(Client c) {
+               // TODO Auto-generated method stub
+       }
+
+       @Override
+       public JSONObject newStoppedMsg(StreamDesc sd) {
+               // TODO Auto-generated method stub
+               return null;
+       }
 }

Reply via email to