This is an automated email from the ASF dual-hosted git repository.

solomax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openmeetings.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f6f03f  [OPENMEETINGS-1649] test record/play seems to work
6f6f03f is described below

commit 6f6f03ff24db05c4044093f1592e96622bfc84b0
Author: Maxim Solodovnik <solomax...@gmail.com>
AuthorDate: Fri Oct 5 14:13:16 2018 +0700

    [OPENMEETINGS-1649] test record/play seems to work
---
 .../openmeetings/core/remote/KTestStream.java      |  48 +++---
 .../openmeetings/core/remote/KurentoHandler.java   | 191 +++++++++++----------
 .../apache/openmeetings/web/room/raw-settings.js   |  18 +-
 .../apache/openmeetings/web/room/raw-video-util.js |   6 +
 4 files changed, 139 insertions(+), 124 deletions(-)

diff --git 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KTestStream.java
 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KTestStream.java
index ab034ed..4e23dd5 100644
--- 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KTestStream.java
+++ 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KTestStream.java
@@ -25,6 +25,7 @@ import static 
org.apache.openmeetings.util.OmFileHelper.getStreamsDir;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.UUID;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
@@ -40,6 +41,7 @@ import org.kurento.client.IceCandidate;
 import org.kurento.client.IceCandidateFoundEvent;
 import org.kurento.client.MediaPipeline;
 import org.kurento.client.MediaProfileSpecType;
+import org.kurento.client.MediaSessionStartedEvent;
 import org.kurento.client.MediaType;
 import org.kurento.client.PlayerEndpoint;
 import org.kurento.client.RecorderEndpoint;
@@ -68,8 +70,7 @@ public class KTestStream implements IKStream {
                webRtcEndpoint.connect(webRtcEndpoint);
 
                MediaProfileSpecType profile = getProfile(msg);
-               //FIXME TODO generated file uid
-               initRecPath(_c.getUid());
+               initRecPath();
                recorder = new RecorderEndpoint.Builder(pipeline, recPath)
                                .stopOnEndOfStream()
                                .withMediaProfile(profile).build();
@@ -110,11 +111,9 @@ public class KTestStream implements IKStream {
                                break;
                }
 
-               // 3. SDP negotiation
                String sdpOffer = msg.getString("sdpOffer");
                String sdpAnswer = webRtcEndpoint.processOffer(sdpOffer);
 
-               // 4. Gather ICE candidates
                addIceListener(_c);
 
                WebSocketHelper.sendClient(_c, newTestKurentoMsg()
@@ -136,38 +135,37 @@ public class KTestStream implements IKStream {
        }
 
        public void play(final IWsClient _c, JSONObject msg, MediaPipeline 
pipeline) {
-               // 1. Media logic
                this.pipeline = pipeline;
                webRtcEndpoint = new WebRtcEndpoint.Builder(pipeline).build();
                player = new PlayerEndpoint.Builder(pipeline, recPath).build();
                player.connect(webRtcEndpoint);
-
-               // Player listeners
-               player.addErrorListener(new EventListener<ErrorEvent>() {
+               webRtcEndpoint.addMediaSessionStartedListener(new 
EventListener<MediaSessionStartedEvent>() {
                        @Override
-                       public void onEvent(ErrorEvent event) {
-                               log.info("ErrorEvent for player with uid '{}': 
{}", _c.getUid(), event.getDescription());
-                               sendPlayEnd(_c);
-                       }
-               });
-               player.addEndOfStreamListener(new 
EventListener<EndOfStreamEvent>() {
-                       @Override
-                       public void onEvent(EndOfStreamEvent event) {
-                               log.info("EndOfStreamEvent for player with uid 
'{}'", _c.getUid());
-                               sendPlayEnd(_c);
+                       public void onEvent(MediaSessionStartedEvent event) {
+                               log.info("Media session started {}", event);
+                               player.addErrorListener(new 
EventListener<ErrorEvent>() {
+                                       @Override
+                                       public void onEvent(ErrorEvent event) {
+                                               log.info("ErrorEvent for player 
with uid '{}': {}", _c.getUid(), event.getDescription());
+                                               sendPlayEnd(_c);
+                                       }
+                               });
+                               player.addEndOfStreamListener(new 
EventListener<EndOfStreamEvent>() {
+                                       @Override
+                                       public void onEvent(EndOfStreamEvent 
event) {
+                                               log.info("EndOfStreamEvent for 
player with uid '{}'", _c.getUid());
+                                               sendPlayEnd(_c);
+                                       }
+                               });
+                               player.play();
                        }
                });
 
-               // 3. SDP negotiation
                String sdpOffer = msg.getString("sdpOffer");
                String sdpAnswer = webRtcEndpoint.processOffer(sdpOffer);
 
-               // 4. Gather ICE candidates
                addIceListener(_c);
 
-               // 5. Play recorded stream
-               player.play();
-
                WebSocketHelper.sendClient(_c, newTestKurentoMsg()
                                .put("id", "playResponse")
                                .put("sdpAnswer", sdpAnswer));
@@ -213,9 +211,9 @@ public class KTestStream implements IKStream {
                }
        }
 
-       private void initRecPath(String uid) {
+       private void initRecPath() {
                try {
-                       File f = new File(getStreamsDir(), 
String.format("%s%s.webm", TEST_SETUP_PREFIX, uid));
+                       File f = new File(getStreamsDir(), 
String.format("%s%s.webm", TEST_SETUP_PREFIX, UUID.randomUUID()));
                        recPath = String.format("file://%s", 
f.getCanonicalPath());
                        log.info("Configured to record to {}", recPath);
                } catch (IOException e) {
diff --git 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
index 6e52c7b..12fac99 100644
--- 
a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
+++ 
b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java
@@ -55,6 +55,7 @@ import org.kurento.client.ObjectCreatedEvent;
 import org.kurento.client.PlayerEndpoint;
 import org.kurento.client.RecorderEndpoint;
 import org.kurento.client.Tag;
+import org.kurento.client.Transaction;
 import org.kurento.client.WebRtcEndpoint;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -65,10 +66,10 @@ import com.github.openjson.JSONObject;
 
 public class KurentoHandler {
        private final static Logger log = 
LoggerFactory.getLogger(KurentoHandler.class);
-       private final static String MODE_TEST = "test";
-       private final static String TAG_KUID = "kuid";
-       private final static String TAG_MODE = "mode";
-       private final static String TAG_ROOM = "roomId";
+       final static String MODE_TEST = "test";
+       final static String TAG_KUID = "kuid";
+       final static String TAG_MODE = "mode";
+       final static String TAG_ROOM = "roomId";
        private final static String HMAC_SHA1_ALGORITHM = "HmacSHA1";
        public final static String KURENTO_TYPE = "kurento";
        private long checkTimeout = 200; //ms
@@ -80,7 +81,6 @@ public class KurentoHandler {
        private int turnTtl = 60; //minutes
        private KurentoClient client;
        private String kuid;
-       private ScheduledExecutorService scheduler;
        private final Map<Long, KRoom> rooms = new ConcurrentHashMap<>();
        final Map<String, KStream> usersByUid = new ConcurrentHashMap<>();
        final Map<String, KTestStream> testsByUid = new ConcurrentHashMap<>();
@@ -92,74 +92,8 @@ public class KurentoHandler {
                try {
                        // TODO check connection, reconnect, listeners etc.
                        client = KurentoClient.create(kurentoWsUrl);
-                       client.getServerManager().addObjectCreatedListener(new 
EventListener<ObjectCreatedEvent>() {
-                               @Override
-                               public void onEvent(ObjectCreatedEvent evt) {
-                                       log.debug("Kurento::ObjectCreated -> 
{}", evt.getObject());
-                                       if (evt.getObject() instanceof 
MediaPipeline) {
-                                               // room created
-                                               final String roid = 
evt.getObject().getId();
-                                               scheduler.schedule(() -> {
-                                                       if (client == null) {
-                                                               return;
-                                                       }
-                                                       // still alive
-                                                       MediaPipeline pipe = 
client.getById(roid, MediaPipeline.class);
-                                                       try {
-                                                               Map<String, 
String> tags = tagsAsMap(pipe);
-                                                               if 
(validTestPipeline(tags)) {
-                                                                       return;
-                                                               }
-                                                               if 
(kuid.equals(tags.get(TAG_KUID))) {
-                                                                       KRoom r 
= rooms.get(Long.valueOf(tags.get(TAG_ROOM)));
-                                                                       if 
(r.getPipelineId().equals(pipe.getId())) {
-                                                                               
return;
-                                                                       } else 
if (r != null) {
-                                                                               
rooms.remove(r.getRoomId());
-                                                                               
r.close();
-                                                                       }
-                                                               }
-                                                       } catch(Exception e) {
-                                                               //no-op, 
connect will be dropped
-                                                       }
-                                                       log.warn("Invalid 
MediaPipeline {} detected, will be dropped", pipe.getId());
-                                                       pipe.release();
-                                               }, checkTimeout, MILLISECONDS);
-                                       } else if (evt.getObject() instanceof 
Endpoint) {
-                                               // endpoint created
-                                               Endpoint _point = 
(Endpoint)evt.getObject();
-                                               final String eoid = 
_point.getId();
-                                               Class<? extends Endpoint> 
_clazz = null;
-                                               if (_point instanceof 
WebRtcEndpoint) {
-                                                       _clazz = 
WebRtcEndpoint.class;
-                                               } else if (_point instanceof 
RecorderEndpoint) {
-                                                       _clazz = 
RecorderEndpoint.class;
-                                               } else if (_point instanceof 
PlayerEndpoint) {
-                                                       _clazz = 
PlayerEndpoint.class;
-                                               }
-                                               final Class<? extends Endpoint> 
clazz = _clazz;
-                                               scheduler.schedule(() -> {
-                                                       if (client == null || 
clazz == null) {
-                                                               return;
-                                                       }
-                                                       // still alive
-                                                       Endpoint point = 
client.getById(eoid, clazz);
-                                                       if 
(validTestPipeline(point.getMediaPipeline())) {
-                                                               return;
-                                                       }
-                                                       Map<String, String> 
tags = tagsAsMap(point);
-                                                       KStream stream = 
getByUid(tags.get("suid"));
-                                                       if (stream != null && 
stream.contains(tags.get("uid"))) {
-                                                               return;
-                                                       }
-                                                       log.warn("Invalid 
Endpoint {} detected, will be dropped", point.getId());
-                                                       point.release();
-                                               }, checkTimeout, MILLISECONDS);
-                                       }
-                               }
-                       });
                        kuid = UUID.randomUUID().toString(); //FIXME TODO 
regenerate on re-connect
-                       scheduler = Executors.newScheduledThreadPool(10);
+                       client.getServerManager().addObjectCreatedListener(new 
KWatchDog(client, kuid, checkTimeout));
                } catch (Exception e) {
                        log.error("Fail to create Kurento client", e);
                }
@@ -180,7 +114,7 @@ public class KurentoHandler {
                }
        }
 
-       private static Map<String, String> tagsAsMap(MediaObject pipe) {
+       static Map<String, String> tagsAsMap(MediaObject pipe) {
                Map<String, String> map = new HashMap<>();
                for (Tag t : pipe.getTags()) {
                        map.put(t.getKey(), t.getValue());
@@ -188,19 +122,13 @@ public class KurentoHandler {
                return map;
        }
 
-       private boolean validTestPipeline(MediaPipeline pipeline) {
-               return validTestPipeline(tagsAsMap(pipeline));
-       }
-
-       private boolean validTestPipeline(Map<String, String> tags) {
-               return kuid.equals(tags.get(TAG_KUID)) && 
MODE_TEST.equals(tags.get(TAG_MODE)) && MODE_TEST.equals(tags.get(TAG_ROOM));
-       }
-
        private MediaPipeline createTestPipeline() {
-               MediaPipeline pipe = client.createMediaPipeline();
-               pipe.addTag(TAG_KUID, kuid);
-               pipe.addTag(TAG_MODE, MODE_TEST);
-               pipe.addTag(TAG_ROOM, MODE_TEST);
+               Transaction t = client.beginTransaction();
+               MediaPipeline pipe = client.createMediaPipeline(t);
+               pipe.addTag(t, TAG_KUID, kuid);
+               pipe.addTag(t, TAG_MODE, MODE_TEST);
+               pipe.addTag(t, TAG_ROOM, MODE_TEST);
+               t.commit();
                return pipe;
        }
 
@@ -216,7 +144,7 @@ public class KurentoHandler {
                                case "wannaRecord":
                                        WebSocketHelper.sendClient(_c, 
newTestKurentoMsg()
                                                        .put("id", "canRecord")
-                                                       .put("configuration", 
new JSONObject().put("iceServers", getTurnServers(true)))
+                                                       .put("iceServers", 
getTurnServers(true))
                                                        );
                                        break;
                                case "record":
@@ -239,7 +167,7 @@ public class KurentoHandler {
                                case "wannaPlay":
                                        WebSocketHelper.sendClient(_c, 
newTestKurentoMsg()
                                                        .put("id", "canPlay")
-                                                       .put("configuration", 
new JSONObject().put("iceServers", getTurnServers(true)))
+                                                       .put("iceServers", 
getTurnServers(true))
                                                        );
                                        break;
                                case "play":
@@ -464,7 +392,7 @@ public class KurentoHandler {
                                        Mac mac = 
Mac.getInstance(HMAC_SHA1_ALGORITHM);
                                        mac.init(new 
SecretKeySpec(turnSecret.getBytes(), HMAC_SHA1_ALGORITHM));
                                        StringBuilder user = new StringBuilder()
-                                                       .append((test ? 30 : 
turnTtl) + System.currentTimeMillis() / 1000L);
+                                                       .append((test ? 60 : 
turnTtl * 60) + System.currentTimeMillis() / 1000L);
                                        if (!Strings.isEmpty(turnUser)) {
                                                
user.append(':').append(turnUser);
                                        }
@@ -510,4 +438,91 @@ public class KurentoHandler {
        public void setTurnTtl(int turnTtl) {
                this.turnTtl = turnTtl;
        }
+
+       private class KWatchDog implements EventListener<ObjectCreatedEvent> {
+               private ScheduledExecutorService scheduler;
+               private final String kuid;
+               private final long checkTimeout;
+               private final KurentoClient client;
+
+               public KWatchDog(final KurentoClient client, final String kuid, 
final long checkTimeout) {
+                       this.client = client;
+                       this.kuid = kuid;
+                       this.checkTimeout = checkTimeout;
+                       scheduler = Executors.newScheduledThreadPool(10);
+               }
+
+               @Override
+               public void onEvent(ObjectCreatedEvent evt) {
+                       log.debug("Kurento::ObjectCreated -> {}", 
evt.getObject());
+                       if (evt.getObject() instanceof MediaPipeline) {
+                               // room created
+                               final String roid = evt.getObject().getId();
+                               scheduler.schedule(() -> {
+                                       if (client == null) {
+                                               return;
+                                       }
+                                       // still alive
+                                       MediaPipeline pipe = 
client.getById(roid, MediaPipeline.class);
+                                       Map<String, String> tags = 
tagsAsMap(pipe);
+                                       try {
+                                               if (validTestPipeline(tags)) {
+                                                       return;
+                                               }
+                                               if 
(kuid.equals(tags.get(TAG_KUID))) {
+                                                       KRoom r = 
rooms.get(Long.valueOf(tags.get(TAG_ROOM)));
+                                                       if 
(r.getPipelineId().equals(pipe.getId())) {
+                                                               return;
+                                                       } else if (r != null) {
+                                                               
rooms.remove(r.getRoomId());
+                                                               r.close();
+                                                       }
+                                               }
+                                       } catch(Exception e) {
+                                               //no-op, connect will be dropped
+                                       }
+                                       log.warn("Invalid MediaPipeline {} 
detected, will be dropped, tags: {}", pipe.getId(), tags);
+                                       pipe.release();
+                               }, checkTimeout, MILLISECONDS);
+                       } else if (evt.getObject() instanceof Endpoint) {
+                               // endpoint created
+                               Endpoint _point = (Endpoint)evt.getObject();
+                               final String eoid = _point.getId();
+                               Class<? extends Endpoint> _clazz = null;
+                               if (_point instanceof WebRtcEndpoint) {
+                                       _clazz = WebRtcEndpoint.class;
+                               } else if (_point instanceof RecorderEndpoint) {
+                                       _clazz = RecorderEndpoint.class;
+                               } else if (_point instanceof PlayerEndpoint) {
+                                       _clazz = PlayerEndpoint.class;
+                               }
+                               final Class<? extends Endpoint> clazz = _clazz;
+                               scheduler.schedule(() -> {
+                                       if (client == null || clazz == null) {
+                                               return;
+                                       }
+                                       // still alive
+                                       Endpoint point = client.getById(eoid, 
clazz);
+                                       if 
(validTestPipeline(point.getMediaPipeline())) {
+                                               return;
+                                       }
+                                       Map<String, String> tags = 
tagsAsMap(point);
+                                       KStream stream = 
getByUid(tags.get("suid"));
+                                       if (stream != null && 
stream.contains(tags.get("uid"))) {
+                                               return;
+                                       }
+                                       log.warn("Invalid Endpoint {} detected, 
will be dropped", point.getId());
+                                       point.release();
+                               }, checkTimeout, MILLISECONDS);
+                       }
+               }
+
+               private boolean validTestPipeline(MediaPipeline pipeline) {
+                       return validTestPipeline(tagsAsMap(pipeline));
+               }
+
+               private boolean validTestPipeline(Map<String, String> tags) {
+                       return kuid.equals(tags.get(TAG_KUID)) && 
MODE_TEST.equals(tags.get(TAG_MODE)) && MODE_TEST.equals(tags.get(TAG_ROOM));
+               }
+       }
 }
diff --git 
a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-settings.js
 
b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-settings.js
index 4292a20..89d0111 100644
--- 
a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-settings.js
+++ 
b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-settings.js
@@ -271,13 +271,10 @@ var VideoSettings = (function() {
                _clear();
                const cnts = _constraints();
                if (cnts.video !== false || cnts.audio !== false) {
-                       const options = {
+                       const options = VideoUtil.addIceServers({
                                localVideo: vid[0]
                                , mediaConstraints: cnts
-                       };
-                       if (msg && msg.configuration) {
-                               options.configuration = msg.configuration;
-                       }
+                       }, msg);
                        rtcPeer = new 
kurentoUtils.WebRtcPeer.WebRtcPeerSendonly(
                                options
                                , function(error) {
@@ -298,7 +295,9 @@ var VideoSettings = (function() {
                                        });
                                });
                }
-               _updateRec();
+               if (!msg) {
+                       _updateRec();
+               }
        }
 
        function _allowRec(allow) {
@@ -422,14 +421,11 @@ var VideoSettings = (function() {
                                                        break;
                                                case 'canPlay':
                                                        {
-                                                               const options = 
{
+                                                               const options = 
VideoUtil.addIceServers({
                                                                        
remoteVideo: vid[0]
                                                                        , 
mediaConstraints: {audio: true, video: true}
                                                                        , 
onicecandidate: _onIceCandidate
-                                                               };
-                                                               if (m && 
m.configuration) {
-                                                                       
options.configuration = m.configuration;
-                                                               }
+                                                               }, m);
                                                                _clear();
                                                                rtcPeer = new 
kurentoUtils.WebRtcPeer.WebRtcPeerRecvonly(
                                                                        options
diff --git 
a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-video-util.js
 
b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-video-util.js
index 44742b6..18b740f 100644
--- 
a/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-video-util.js
+++ 
b/openmeetings-web/src/main/java/org/apache/openmeetings/web/room/raw-video-util.js
@@ -135,5 +135,11 @@ var VideoUtil = (function() {
        self.arrange = _arrange;
        self.cleanStream = _cleanStream;
        self.cleanPeer = _cleanPeer;
+       self.addIceServers = function(opts, m) {
+               if (m && m.iceServers && m.iceServers.length > 0) {
+                       opts.configuration = {iceServers: m.iceServers};
+               }
+               return opts;
+       };
        return self;
 })();

Reply via email to