This is an automated email from the ASF dual-hosted git repository. solomax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/openmeetings.git
The following commit(s) were added to refs/heads/master by this push: new 49363db [OPENMEETINGS-2239] initial commit with jain-sip involved 49363db is described below commit 49363db8e0515da3c2e9051e199f146af460349c Author: Maxim Solodovnik <solomax...@gmail.com> AuthorDate: Thu Oct 29 14:03:32 2020 +0700 [OPENMEETINGS-2239] initial commit with jain-sip involved --- .../openmeetings/core/remote/AbstractStream.java | 5 + .../org/apache/openmeetings/core/remote/KRoom.java | 42 ++- .../apache/openmeetings/core/remote/KStream.java | 101 +++++-- .../openmeetings/core/remote/KurentoHandler.java | 2 +- .../openmeetings/core/remote/StreamProcessor.java | 16 +- .../openmeetings/core/sip/ISipCallbacks.java | 23 ++ .../apache/openmeetings/core/sip/SipManager.java | 304 ++----------------- .../openmeetings/core/sip/SipStackProcessor.java | 329 +++++++++++++++++++++ .../db/entity/user/AsteriskSipUser.java | 14 +- .../src/site/markdown/AsteriskIntegration.md | 33 +-- .../web/admin/connection/KStreamDto.java | 2 +- 11 files changed, 497 insertions(+), 374 deletions(-) diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/AbstractStream.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/AbstractStream.java index 6de2a46..7891125 100644 --- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/AbstractStream.java +++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/AbstractStream.java @@ -22,6 +22,7 @@ import org.kurento.client.MediaPipeline; import org.kurento.client.MediaProfileSpecType; import org.kurento.client.PlayerEndpoint; import org.kurento.client.RecorderEndpoint; +import org.kurento.client.RtpEndpoint; import org.kurento.client.WebRtcEndpoint; public abstract class AbstractStream { @@ -60,4 +61,8 @@ public abstract class AbstractStream { public static PlayerEndpoint createPlayerEndpoint(MediaPipeline pipeline, String path) { return new PlayerEndpoint.Builder(pipeline, path).build(); } + + public static RtpEndpoint createRtpEndpoint(MediaPipeline pipeline) { + return new RtpEndpoint.Builder(pipeline).build(); + } } diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java index 433a74e..d1f993d 100644 --- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java +++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KRoom.java @@ -63,8 +63,7 @@ public class KRoom { */ private final StreamProcessor processor; private final RecordingChunkDao chunkDao; - private final Long roomId; - private final Room.Type type; + private final Room room; private final AtomicBoolean recordingStarted = new AtomicBoolean(false); private final AtomicBoolean sharingStarted = new AtomicBoolean(false); private Long recordingId = null; @@ -74,17 +73,12 @@ public class KRoom { public KRoom(KurentoHandler handler, Room r) { this.processor = handler.getStreamProcessor(); this.chunkDao = handler.getChunkDao(); - this.roomId = r.getId(); - this.type = r.getType(); - log.info("ROOM {} has been created", roomId); + this.room = r; + log.info("ROOM {} has been created", room.getId()); } - public Long getRoomId() { - return roomId; - } - - public Room.Type getType() { - return type; + public Room getRoom() { + return room; } public Long getRecordingId() { @@ -96,7 +90,7 @@ public class KRoom { } public KStream join(final StreamDesc sd, KurentoHandler kHandler) { - log.info("ROOM {}: join client {}, stream: {}", roomId, sd.getClient(), sd.getUid()); + log.info("ROOM {}: join client {}, stream: {}", room.getId(), sd.getClient(), sd.getUid()); final KStream stream = new KStream(sd, this, kHandler); processor.addStream(stream); return stream; @@ -125,7 +119,7 @@ public class KRoom { if (recordingStarted.compareAndSet(false, true)) { IApplication app = ensureApplication(c.getUser().getLanguageId()); - log.debug("##REC:: recording in room {} is starting ::", roomId); + log.debug("##REC:: recording in room {} is starting ::", room.getId()); Room r = c.getRoom(); boolean interview = Room.Type.INTERVIEW == r.getType(); @@ -147,7 +141,7 @@ public class KRoom { rec.setType(BaseFileItem.Type.RECORDING); rec.setInterview(interview); - rec.setRoomId(roomId); + rec.setRoomId(room.getId()); rec.setRecordStart(now); rec.setOwnerId(ownerId); @@ -164,18 +158,18 @@ public class KRoom { rec = processor.getRecordingDao().update(rec); // Receive recordingId recordingId = rec.getId(); - processor.getByRoom(this.getRoomId()).forEach(KStream::startRecord); + processor.getByRoom(room.getId()).forEach(KStream::startRecord); // Send notification to all users that the recording has been started - WebSocketHelper.sendRoom(new RoomMessage(roomId, u, RoomMessage.Type.RECORDING_TOGGLED)); - log.debug("##REC:: recording in room {} is started {} ::", roomId, recordingId); + WebSocketHelper.sendRoom(new RoomMessage(room.getId(), u, RoomMessage.Type.RECORDING_TOGGLED)); + log.debug("##REC:: recording in room {} is started {} ::", room.getId(), recordingId); } } public void stopRecording(Client c) { if (recordingStarted.compareAndSet(true, false)) { - log.debug("##REC:: recording in room {} is stopping {} ::", roomId, recordingId); - processor.getByRoom(this.getRoomId()).forEach(KStream::stopRecord); + log.debug("##REC:: recording in room {} is stopping {} ::", room.getId(), recordingId); + processor.getByRoom(room.getId()).forEach(KStream::stopRecord); Recording rec = processor.getRecordingDao().get(recordingId); rec.setRecordEnd(new Date()); rec = processor.getRecordingDao().update(rec); @@ -196,8 +190,8 @@ public class KRoom { } } // Send notification to all users that the recording has been started - WebSocketHelper.sendRoom(new RoomMessage(roomId, u, RoomMessage.Type.RECORDING_TOGGLED)); - log.debug("##REC:: recording in room {} is stopped ::", roomId); + WebSocketHelper.sendRoom(new RoomMessage(room.getId(), u, RoomMessage.Type.RECORDING_TOGGLED)); + log.debug("##REC:: recording in room {} is stopped ::", room.getId()); } } @@ -236,7 +230,7 @@ public class KRoom { cm.update(c); h.sendShareUpdated(sd); WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), c, RoomMessage.Type.RIGHT_UPDATED, c.getUid())); - WebSocketHelper.sendRoomOthers(roomId, c.getUid(), newKurentoMsg() + WebSocketHelper.sendRoomOthers(room.getId(), c.getUid(), newKurentoMsg() .put("id", "newStream") .put(PARAM_ICE, processor.getHandler().getTurnServers(c)) .put("stream", sd.toJson())); @@ -250,7 +244,7 @@ public class KRoom { } public void close() { - processor.getByRoom(this.getRoomId()).forEach(KStream::release); - log.debug("Room {} closed", this.roomId); + processor.getByRoom(room.getId()).forEach(KStream::release); + log.debug("Room {} closed", room.getId()); } } diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java index 1b077eb..8410cd3 100644 --- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java +++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KStream.java @@ -41,6 +41,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; +import org.apache.openmeetings.core.sip.ISipCallbacks; +import org.apache.openmeetings.core.sip.SipStackProcessor; import org.apache.openmeetings.core.util.WebSocketHelper; import org.apache.openmeetings.db.entity.basic.Client; import org.apache.openmeetings.db.entity.basic.Client.Activity; @@ -52,6 +54,7 @@ import org.apache.openmeetings.db.util.ws.TextRoomMessage; import org.kurento.client.Continuation; import org.kurento.client.IceCandidate; import org.kurento.client.MediaFlowState; +import org.kurento.client.MediaObject; import org.kurento.client.MediaPipeline; import org.kurento.client.MediaProfileSpecType; import org.kurento.client.MediaType; @@ -64,28 +67,31 @@ import org.slf4j.LoggerFactory; import com.github.openjson.JSONObject; -public class KStream extends AbstractStream { +public class KStream extends AbstractStream implements ISipCallbacks { private static final Logger log = LoggerFactory.getLogger(KStream.class); private final KurentoHandler kHandler; - private final KRoom room; + private final KRoom kRoom; private final Date connectedSince; private final StreamType streamType; private MediaProfileSpecType profile; private MediaPipeline pipeline; private RecorderEndpoint recorder; private WebRtcEndpoint outgoingMedia = null; + private RtpEndpoint rtpEndpoint; + private Optional<SipStackProcessor> sipProcessor; private final ConcurrentMap<String, WebRtcEndpoint> listeners = new ConcurrentHashMap<>(); private Optional<CompletableFuture<Object>> flowoutFuture = Optional.empty(); private Long chunkId; private Type type; + private String sdpOffer; private boolean hasAudio; private boolean hasVideo; private boolean hasScreen; - public KStream(final StreamDesc sd, KRoom room, KurentoHandler kHandler) { + public KStream(final StreamDesc sd, KRoom kRoom, KurentoHandler kHandler) { super(sd.getSid(), sd.getUid()); - this.room = room; + this.kRoom = kRoom; streamType = sd.getType(); this.connectedSince = new Date(); this.kHandler = kHandler; @@ -130,7 +136,7 @@ public class KStream extends AbstractStream { profile = MediaProfileSpecType.WEBM_VIDEO_ONLY; break; } - pipeline = kHandler.createPipiline(Map.of(TAG_ROOM, String.valueOf(room.getRoomId()), TAG_STREAM_UID, sd.getUid()), new Continuation<Void>() { + pipeline = kHandler.createPipiline(Map.of(TAG_ROOM, String.valueOf(getRoomId()), TAG_STREAM_UID, sd.getUid()), new Continuation<Void>() { @Override public void onSuccess(Void result) throws Exception { internalStartBroadcast(sd, sdpOffer); @@ -144,7 +150,8 @@ public class KStream extends AbstractStream { }); } - private void internalStartBroadcast(final StreamDesc sd, final String sdpOffer) { + private void internalStartBroadcast(final StreamDesc sd, final String sdpOffer) throws Exception { + this.sdpOffer = sdpOffer; outgoingMedia = createEndpoint(sd.getSid(), sd.getUid()); outgoingMedia.addMediaSessionTerminatedListener(evt -> log.warn("Media stream terminated {}", sd)); outgoingMedia.addMediaFlowOutStateChangeListener(evt -> { @@ -169,29 +176,29 @@ public class KStream extends AbstractStream { }); outgoingMedia.addMediaFlowInStateChangeListener(evt -> log.warn("Media FlowIn :: {}", evt)); addListener(sd.getSid(), sd.getUid(), sdpOffer); - if (room.isRecording()) { + sipProcessor = kHandler.getSipManager().createSipStackProcessor( + randomUUID().toString() + , kRoom.getRoom() + , this); // TODO check this + sipProcessor.ifPresent(ssp -> { + ssp.register(); + }); + if (kRoom.isRecording()) { startRecord(); } Client c = sd.getClient(); WebSocketHelper.sendRoom(new TextRoomMessage(c.getRoomId(), c, RoomMessage.Type.RIGHT_UPDATED, c.getUid())); if (hasAudio || hasVideo || hasScreen) { - WebSocketHelper.sendRoomOthers(room.getRoomId(), c.getUid(), newKurentoMsg() + WebSocketHelper.sendRoomOthers(getRoomId(), c.getUid(), newKurentoMsg() .put("id", "newStream") .put(PARAM_ICE, kHandler.getTurnServers(c)) .put("stream", sd.toJson())); } } - private RtpEndpoint createRtpEndpoint(MediaPipeline pipeline) { - RtpEndpoint endpoint = new RtpEndpoint.Builder(pipeline).build(); - endpoint.addTag("outUid", this.uid); - endpoint.addTag("uid", uid); - return endpoint; - } - public void addListener(String sid, String uid, String sdpOffer) { final boolean self = uid.equals(this.uid); - log.info("USER {}: have started {} in room {}", uid, self ? "broadcasting" : "receiving", room.getRoomId()); + log.info("USER {}: have started {} in kRoom {}", uid, self ? "broadcasting" : "receiving", getRoomId()); log.trace("USER {}: SdpOffer is {}", uid, sdpOffer); if (!self && outgoingMedia == null) { log.warn("Trying to add listener too early"); @@ -246,10 +253,20 @@ public class KStream extends AbstractStream { return listener; } - private WebRtcEndpoint createEndpoint(String sid, String uid) { - WebRtcEndpoint endpoint = createWebRtcEndpoint(pipeline); + private void setTags(MediaObject endpoint, String uid) { endpoint.addTag("outUid", this.uid); endpoint.addTag("uid", uid); + } + + private RtpEndpoint getRtpEndpoint(MediaPipeline pipeline) { + RtpEndpoint endpoint = new RtpEndpoint.Builder(pipeline).build(); + setTags(endpoint, uid); + return endpoint; + } + + private WebRtcEndpoint createEndpoint(String sid, String uid) { + WebRtcEndpoint endpoint = createWebRtcEndpoint(pipeline); + setTags(endpoint, uid); endpoint.addIceCandidateFoundListener(evt -> kHandler.sendClient(sid , newKurentoMsg() @@ -266,13 +283,12 @@ public class KStream extends AbstractStream { release(true); return; } - final String chunkUid = "rec_" + room.getRecordingId() + "_" + randomUUID(); - recorder = createRecorderEndpoint(pipeline, getRecUri(getRecordingChunk(room.getRoomId(), chunkUid)), profile); - recorder.addTag("outUid", uid); - recorder.addTag("uid", uid); + final String chunkUid = "rec_" + kRoom.getRecordingId() + "_" + randomUUID(); + recorder = createRecorderEndpoint(pipeline, getRecUri(getRecordingChunk(getRoomId(), chunkUid)), profile); + setTags(recorder, uid); - recorder.addRecordingListener(evt -> chunkId = room.getChunkDao().start(room.getRecordingId(), type, chunkUid, sid)); - recorder.addStoppedListener(evt -> room.getChunkDao().stop(chunkId)); + recorder.addRecordingListener(evt -> chunkId = kRoom.getChunkDao().start(kRoom.getRecordingId(), type, chunkUid, sid)); + recorder.addStoppedListener(evt -> kRoom.getChunkDao().stop(chunkId)); switch (profile) { case WEBM: outgoingMedia.connect(recorder, MediaType.AUDIO); @@ -312,7 +328,7 @@ public class KStream extends AbstractStream { } public void stopBroadcast() { - room.onStopBroadcast(this); + kRoom.onStopBroadcast(this); } public void pauseSharing() { @@ -357,6 +373,7 @@ public class KStream extends AbstractStream { if (outgoingMedia != null) { releaseListeners(); releaseRecorder(false); + releaseRtp(); outgoingMedia.release(new Continuation<Void>() { @Override public void onSuccess(Void result) throws Exception { @@ -429,6 +446,24 @@ public class KStream extends AbstractStream { } } + private void releaseRtp() { + if (rtpEndpoint != null) { + rtpEndpoint.release(new Continuation<Void>() { + @Override + public void onSuccess(Void result) throws Exception { + log.trace("PARTICIPANT {}: RtpEndpoint released successfully", KStream.this.uid); + } + + @Override + public void onError(Throwable cause) throws Exception { + log.warn("PARTICIPANT {}: Could not release RtpEndpoint", KStream.this.uid, cause); + } + }); + rtpEndpoint = null; + } + sipProcessor.ifPresent(SipStackProcessor::destroy); + } + public void addCandidate(IceCandidate candidate, String uid) { if (this.uid.equals(uid)) { if (outgoingMedia == null) { @@ -462,8 +497,12 @@ public class KStream extends AbstractStream { return connectedSince; } - public KRoom getRoom() { - return room; + public KRoom getKRoom() { + return kRoom; + } + + public Long getRoomId() { + return kRoom.getRoom().getId(); } MediaPipeline getPipeline() { @@ -500,8 +539,14 @@ public class KStream extends AbstractStream { @Override public String toString() { - return "KStream [room=" + room + ", streamType=" + streamType + ", profile=" + profile + ", recorder=" + return "KStream [kRoom=" + kRoom + ", streamType=" + streamType + ", profile=" + profile + ", recorder=" + recorder + ", outgoingMedia=" + outgoingMedia + ", listeners=" + listeners + ", flowoutFuture=" + flowoutFuture + ", chunkId=" + chunkId + ", type=" + type + ", sid=" + sid + ", uid=" + uid + "]"; } + + @Override + public void onRegister() { + rtpEndpoint = getRtpEndpoint(pipeline); + sipProcessor.get().invite(kRoom.getRoom(), sdpOffer); + } } diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java index 12580c2..ce9b19c 100644 --- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java +++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/KurentoHandler.java @@ -459,7 +459,7 @@ public class KurentoHandler { if (kuid.equals(inKuid)) { KStream stream = streamProcessor.getByUid(tags.get(TAG_STREAM_UID)); if (stream != null) { - if (stream.getRoom().getRoomId().equals(Long.valueOf(tags.get(TAG_ROOM))) + if (stream.getRoomId().equals(Long.valueOf(tags.get(TAG_ROOM))) && stream.getPipeline().getId().equals(pipe.getId())) { return; diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java index 4855d86..300f77a 100644 --- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java +++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/remote/StreamProcessor.java @@ -310,27 +310,27 @@ public class StreamProcessor implements IStreamProcessor { if (!kHandler.isConnected()) { return; } - KRoom room = kHandler.getRoom(roomId); - if (room.isSharing() && cm.streamByRoom(roomId) + KRoom kRoom = kHandler.getRoom(roomId); + if (kRoom.isSharing() && cm.streamByRoom(roomId) .flatMap(c -> c.getStreams().stream()) .filter(sd -> StreamType.SCREEN == sd.getType()) .findAny() .isEmpty()) { log.info("No more screen streams in the room, stopping sharing"); - room.stopSharing(); - if (Room.Type.INTERVIEW != room.getType() && room.isRecording()) { + kRoom.stopSharing(); + if (Room.Type.INTERVIEW != kRoom.getRoom().getType() && kRoom.isRecording()) { log.info("No more screen streams in the non-interview room, stopping recording"); - room.stopRecording(null); + kRoom.stopRecording(null); } } - if (room.isRecording() && cm.streamByRoom(roomId) + if (kRoom.isRecording() && cm.streamByRoom(roomId) .flatMap(c -> c.getStreams().stream()) .findAny() .isEmpty()) { log.info("No more streams in the room, stopping recording"); - room.stopRecording(null); + kRoom.stopRecording(null); } } @@ -518,7 +518,7 @@ public class StreamProcessor implements IStreamProcessor { Stream<KStream> getByRoom(Long roomId) { return streamByUid.values().stream() - .filter(stream -> stream.getRoom() != null && stream.getRoom().getRoomId().equals(roomId)); + .filter(stream -> stream.getRoomId().equals(roomId)); } Client getBySid(String sid) { diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/sip/ISipCallbacks.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/sip/ISipCallbacks.java new file mode 100644 index 0000000..a11fc98 --- /dev/null +++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/sip/ISipCallbacks.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License") + you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.openmeetings.core.sip; + +public interface ISipCallbacks { + void onRegister(); +} diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/sip/SipManager.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/sip/SipManager.java index 167b800..4c42e88 100644 --- a/openmeetings-core/src/main/java/org/apache/openmeetings/core/sip/SipManager.java +++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/sip/SipManager.java @@ -18,43 +18,13 @@ */ package org.apache.openmeetings.core.sip; -import static javax.sip.message.Request.INVITE; -import static javax.sip.message.Request.REGISTER; -import static javax.sip.message.Response.OK; -import static javax.sip.message.Response.RINGING; -import static javax.sip.message.Response.TRYING; -import static javax.sip.message.Response.UNAUTHORIZED; import static org.apache.openmeetings.util.OmFileHelper.SIP_USER_ID; import static org.apache.openmeetings.util.OpenmeetingsVariables.isSipEnabled; -import java.text.ParseException; -import java.util.List; -import java.util.Properties; -import java.util.Random; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Consumer; +import java.util.Optional; import java.util.function.Function; import javax.annotation.PostConstruct; -import javax.sip.ClientTransaction; -import javax.sip.DialogTerminatedEvent; -import javax.sip.IOExceptionEvent; -import javax.sip.ListeningPoint; -import javax.sip.RequestEvent; -import javax.sip.ResponseEvent; -import javax.sip.ServerTransaction; -import javax.sip.SipException; -import javax.sip.SipFactory; -import javax.sip.SipProvider; -import javax.sip.TimeoutEvent; -import javax.sip.TransactionTerminatedEvent; -import javax.sip.address.Address; -import javax.sip.address.AddressFactory; -import javax.sip.header.ContactHeader; -import javax.sip.header.HeaderFactory; -import javax.sip.message.MessageFactory; -import javax.sip.message.Request; -import javax.sip.message.Response; import org.apache.openmeetings.db.entity.room.Room; import org.apache.openmeetings.db.entity.user.User; @@ -80,25 +50,13 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; -import gov.nist.javax.sip.DialogTimeoutEvent; -import gov.nist.javax.sip.SipListenerExt; -import gov.nist.javax.sip.SipStackExt; -import gov.nist.javax.sip.address.SipUri; -import gov.nist.javax.sip.clientauthutils.AuthenticationHelper; -import gov.nist.javax.sip.clientauthutils.UserCredentials; -import gov.nist.javax.sip.stack.NioMessageProcessorFactory; - @Service -public class SipManager implements ISipManager, SipListenerExt { +public class SipManager implements ISipManager { private static final Logger log = LoggerFactory.getLogger(SipManager.class); public static final String ASTERISK_OM_FAMILY = "openmeetings"; public static final String ASTERISK_OM_KEY = "rooms"; public static final String SIP_FIRST_NAME = "SIP Transport"; public static final String SIP_USER_NAME = "--SIP--"; - private static final String SIP_TRANSPORT = "ws"; - private static final <T> Consumer<T> noop() { - return t -> {}; - } @Value("${sip.hostname}") private String sipHostname; @@ -111,70 +69,30 @@ public class SipManager implements ISipManager, SipListenerExt { @Value("${sip.manager.timeout}") private long managerTimeout; - @Value("${sip.ws.local.port}") - private int localWsPort = 6666; - @Value("${sip.ws.local.host}") - private String localWsHost; @Value("${sip.ws.remote.port}") private int wsPort; @Value("${sip.ws.remote.user}") private String omSipUser; @Value("${sip.ws.remote.password}") private String omSipPasswd; + @Value("${sip.ws.local.host}") + private String localWsHost; + @Value("${sip.ws.local.port.min}") + private int minLocalWsPort = 6666; + @Value("${sip.ws.local.port.max}") + private int maxLocalWsPort = 7666; - private final AtomicLong cseq = new AtomicLong(); - private final Random rnd = new Random(); - - private String tag; - private String branch; - - private SipProvider sipProvider; - private SipStackExt sipStack; - private MessageFactory messageFactory; - private HeaderFactory headerFactory; - private AddressFactory addressFactory; - private ContactHeader contactHeader; private ManagerConnectionFactory factory; private String sipUserPicture; @PostConstruct - public void init() throws Exception { + public void init() { if (!Strings.isEmpty(sipHostname)) { factory = new ManagerConnectionFactory( sipHostname , managerPort , managerUser , managerPass); - final SipFactory sipFactory = SipFactory.getInstance(); - sipFactory.setPathName("gov.nist"); - - final Properties properties = new Properties(); - properties.setProperty("javax.sip.STACK_NAME", "stack"); - //properties.setProperty("gov.nist.javax.sip.TRACE_LEVEL", "32"); - properties.setProperty("gov.nist.javax.sip.LOG_MESSAGE_CONTENT", "true"); - properties.setProperty("gov.nist.javax.sip.MESSAGE_PROCESSOR_FACTORY", NioMessageProcessorFactory.class.getName()); - sipStack = (SipStackExt) sipFactory.createSipStack(properties); - tag = getRnd(10); - branch = getRnd(14); - - messageFactory = sipFactory.createMessageFactory(); - headerFactory = sipFactory.createHeaderFactory(); - addressFactory = sipFactory.createAddressFactory(); - final ListeningPoint listeningPoint = sipStack.createListeningPoint( - localWsHost - , localWsPort - , SIP_TRANSPORT); - sipProvider = sipStack.createSipProvider(listeningPoint); - sipProvider.addSipListener(this); - Address contact = createAddr(omSipUser, localWsHost, uri -> { - try { - uri.setPort(localWsPort); - uri.setTransportParam(SIP_TRANSPORT); - } catch (ParseException e) { - log.error("fail to create contact address", e); - } - }); - contactHeader = headerFactory.createContactHeader(contact); } } @@ -241,7 +159,7 @@ public class SipManager implements ISipManager, SipListenerExt { return ASTERISK_OM_KEY + "/" + confno; } - private static String getSipNumber(Room r) { + static String getSipNumber(Room r) { return (r != null && r.getConfno() != null) ? r.getConfno() : null; } @@ -330,203 +248,31 @@ public class SipManager implements ISipManager, SipListenerExt { return u; } - @Override - public void processDialogTerminated(DialogTerminatedEvent evt) { - log.error("processDialogTerminated: \n{}", evt); - } - - @Override - public void processIOException(IOExceptionEvent evt) { - log.error("processIOException: \n{}", evt); - } - - @Override - public void processTimeout(TimeoutEvent evt) { - log.error("processTimeout: \n{}", evt); - } - - @Override - public void processTransactionTerminated(TransactionTerminatedEvent evt) { - log.error("processTransactionTerminated: \n{}", evt); - } - - @Override - public void processDialogTimeout(DialogTimeoutEvent timeoutEvent) { - log.error("processDialogTimeout: \n{}", timeoutEvent); - } - - @Override - public void processRequest(RequestEvent evt) { - log.debug("processRequest: \n\n{}", evt.getRequest()); - Request rq = evt.getRequest(); - String method = rq.getMethod(); - try { - if (Request.OPTIONS.equals(method)) { - ServerTransaction transaction = sipProvider.getNewServerTransaction(rq); - Response resp = messageFactory.createResponse(200, rq); - resp.addHeader(contactHeader); - transaction.sendResponse(resp); - } - } catch (Exception e) { - log.error("processRequest", e); - } - } - - @Override - public void processResponse(ResponseEvent evt) { - Response resp = evt.getResponse(); - ClientTransaction curTrans = evt.getClientTransaction(); - Request prevReq = curTrans.getRequest(); - log.warn("Response code: {} on {}", resp.getStatusCode(), prevReq.getMethod()); - switch (resp.getStatusCode()) { - case UNAUTHORIZED: - processUnauth(evt); - break; - case OK: - break; - case TRYING: - case RINGING: - break; - // FIXME TODO other codes: 404 - default: - log.debug("No handler for response: \n\n{}", resp); - } + String getSipHostname() { + return sipHostname; } - private String getRnd(int count) { - return rnd.ints('0', 'z' + 1) - .filter(ch -> (ch >= '0' && ch <= '9') || (ch >= 'a' && ch <= 'z')) - .limit(count) - .collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append) - .toString(); + int getWsPort() { + return wsPort; } - private Address createAddr(String user) { - return createAddr(user, sipHostname, noop()); + String getOmSipUser() { + return omSipUser; } - private Address createAddr(String user, String host, Consumer<SipUri> cons) { - try { - SipUri uri = new SipUri(); - uri.setHost(host); - uri.setUser(user); - cons.accept(uri); - return addressFactory.createAddress(user, uri); - } catch (ParseException e) { - log.error("fail to create address", e); - } - return null; + String getOmSipPasswd() { + return omSipPasswd; } - private void sendRequest(String method, String to, Consumer<SipUri> uriCons, Consumer<Request> reqCons) throws Exception { - SipUri uri = new SipUri(); - uri.setHost(sipHostname); - uri.setPort(wsPort); - uri.setTransportParam(SIP_TRANSPORT); - uri.setMethodParam("GET"); - uri.setHeader("Host", sipHostname); - uri.setHeader("Location", "/ws"); - uriCons.accept(uri); - - Request request = messageFactory.createRequest( - uri - , method - , sipProvider.getNewCallId() - , headerFactory.createCSeqHeader(cseq.incrementAndGet(), method) - , headerFactory.createFromHeader(createAddr(omSipUser), tag) - , headerFactory.createToHeader(createAddr(to), null) - , List.of(headerFactory.createViaHeader(localWsHost, localWsPort, SIP_TRANSPORT, branch)) - , headerFactory.createMaxForwardsHeader(70)); - request.addHeader(contactHeader); - request.addHeader(headerFactory.createExpiresHeader(600)); - - reqCons.accept(request); - - log.debug("sendRequest: \n\n{}", request); - - ClientTransaction trans = sipProvider.getNewClientTransaction(request); - trans.sendRequest(); + String getLocalWsHost() { + return localWsHost; } - private void processUnauth(ResponseEvent evt) { - Response resp = evt.getResponse(); - ClientTransaction curTrans = evt.getClientTransaction(); - AuthenticationHelper helper = sipStack.getAuthenticationHelper((trans, s) -> new UserCredentials() { - @Override - public String getUserName() { - return omSipUser; - } - - @Override - public String getPassword() { - return omSipPasswd; - } - - @Override - public String getSipDomain() { - return "asterisk"; - } - }, headerFactory); - try { - ClientTransaction trans = helper.handleChallenge(resp, curTrans, sipProvider, 5); - trans.sendRequest(); - } catch (SipException e) { - log.error("Error while sending AUTH", e); - } - } - - private void addAllow(Request req) throws ParseException { - req.addHeader(headerFactory.createAllowHeader("ACK,CANCEL,INVITE,MESSAGE,BYE,OPTIONS,INFO,NOTIFY,REFER")); - } - - private void register() throws Exception { - sendRequest( - REGISTER - , omSipUser - , noop() - , req -> { - try { - addAllow(req); - } catch (ParseException e) { - log.error("fail to create allow header", e); - } - }); - } - - private void invite(Room r, String sdp) throws Exception { - final String sipNumber = getSipNumber(r); - if (sipNumber == null) { - log.warn("Failed to get SIP number for room: {}", r); - return; + public Optional<SipStackProcessor> createSipStackProcessor(String name, Room r, ISipCallbacks callbacks) throws Exception { + if (factory == null || !isSipEnabled() || !r.isSipEnabled()) { + log.warn("Asterisk is not configured or denied in room #{}", r.getId()); + return Optional.empty(); } - sendRequest( - INVITE - , sipNumber - , uri -> uri.setUser(sipNumber) - , req -> { - /* - * ContentLengthHeader contentLength = - * this.headerFactory.createContentLengthHeader(300); - * ContentTypeHeader contentType = - * this.headerFactory.createContentTypeHeader("application", - * "sdp"); - * - * String sdpData = "v=0\n" + - * "o=user1 392867480 292042336 IN IP4 xx.xx.xx.xx\n" + "s=-\n" - * + "c=IN IP4 xx.xx.xx.xx\n" + "t=0 0\n" + - * "m=audio 8000 RTP/AVP 0 8 101\n" + "a=rtpmap:0 PCMU/8000\n" + - * "a=rtpmap:8 PCMA/8000\n" + - * "a=rtpmap:101 telephone-event/8000\n" + "a=sendrecv"; byte[] - * contents = sdpData.getBytes(); this.contactHeader = - * this.headerFactory.createContactHeader(contactAddress); - */ - try { - addAllow(req); - req.addHeader(headerFactory.createContentLengthHeader(sdp.length())); - req.setContent(sdp, headerFactory.createContentTypeHeader("application", "sdp")); - } catch (Exception e) { - log.error("fail to create allow header", e); - } - }); + return Optional.of(new SipStackProcessor(this, name, minLocalWsPort, callbacks)); /// FIXME TODO } } diff --git a/openmeetings-core/src/main/java/org/apache/openmeetings/core/sip/SipStackProcessor.java b/openmeetings-core/src/main/java/org/apache/openmeetings/core/sip/SipStackProcessor.java new file mode 100644 index 0000000..483f148 --- /dev/null +++ b/openmeetings-core/src/main/java/org/apache/openmeetings/core/sip/SipStackProcessor.java @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License") + you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.openmeetings.core.sip; + +import static javax.sip.message.Request.INVITE; +import static javax.sip.message.Request.REGISTER; +import static javax.sip.message.Response.OK; +import static javax.sip.message.Response.RINGING; +import static javax.sip.message.Response.TRYING; +import static javax.sip.message.Response.UNAUTHORIZED; +import static org.apache.openmeetings.core.sip.SipManager.getSipNumber; + +import java.text.ParseException; +import java.util.List; +import java.util.Properties; +import java.util.Random; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; + +import javax.sip.ClientTransaction; +import javax.sip.DialogTerminatedEvent; +import javax.sip.IOExceptionEvent; +import javax.sip.ListeningPoint; +import javax.sip.RequestEvent; +import javax.sip.ResponseEvent; +import javax.sip.ServerTransaction; +import javax.sip.SipException; +import javax.sip.SipFactory; +import javax.sip.SipProvider; +import javax.sip.TimeoutEvent; +import javax.sip.TransactionTerminatedEvent; +import javax.sip.address.Address; +import javax.sip.address.AddressFactory; +import javax.sip.header.ContactHeader; +import javax.sip.header.HeaderFactory; +import javax.sip.message.MessageFactory; +import javax.sip.message.Request; +import javax.sip.message.Response; + +import org.apache.openmeetings.db.entity.room.Room; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import gov.nist.javax.sip.DialogTimeoutEvent; +import gov.nist.javax.sip.SipListenerExt; +import gov.nist.javax.sip.SipStackExt; +import gov.nist.javax.sip.SipStackImpl; +import gov.nist.javax.sip.address.SipUri; +import gov.nist.javax.sip.clientauthutils.AuthenticationHelper; +import gov.nist.javax.sip.clientauthutils.UserCredentials; +import gov.nist.javax.sip.stack.NioMessageProcessorFactory; + +public class SipStackProcessor implements SipListenerExt { + private static final Logger log = LoggerFactory.getLogger(SipStackProcessor.class); + private static final String SIP_TRANSPORT = "ws"; + private static final <T> Consumer<T> noop() { + return t -> {}; + } + + private final SipManager manager; + private final ISipCallbacks callbacks; + private final String sipHost; + private final int wsPort; + private final String omSipUser; + private final String omSipPasswd; + private final String localWsHost; + private final int localWsPort; + private final String tag; + + private final SipProvider sipProvider; + private final SipStackExt sipStack; + private final MessageFactory messageFactory; + private final HeaderFactory headerFactory; + private final AddressFactory addressFactory; + private final ContactHeader contactHeader; + private final AtomicLong cseq = new AtomicLong(); + private final Random rnd = new Random(); + + SipStackProcessor(SipManager manager, String name, int localWsPort, ISipCallbacks callbacks) throws Exception { + this.manager = manager; + this.callbacks = callbacks; + this.sipHost = manager.getSipHostname(); + this.wsPort = manager.getWsPort(); + this.omSipUser = manager.getOmSipUser(); + this.omSipPasswd = manager.getOmSipPasswd(); + this.localWsHost = manager.getLocalWsHost(); + this.localWsPort = localWsPort; + + final SipFactory sipFactory = SipFactory.getInstance(); + + final Properties properties = new Properties(); + properties.setProperty("javax.sip.STACK_NAME", name); + //properties.setProperty("gov.nist.javax.sip.TRACE_LEVEL", "32"); + properties.setProperty("gov.nist.javax.sip.LOG_MESSAGE_CONTENT", "true"); + properties.setProperty("gov.nist.javax.sip.MESSAGE_PROCESSOR_FACTORY", NioMessageProcessorFactory.class.getName()); + sipStack = new SipStackImpl(properties); + tag = getRnd(10); + + messageFactory = sipFactory.createMessageFactory(); + headerFactory = sipFactory.createHeaderFactory(); + addressFactory = sipFactory.createAddressFactory(); + final ListeningPoint listeningPoint = sipStack.createListeningPoint( + localWsHost + , localWsPort + , SIP_TRANSPORT); + sipProvider = sipStack.createSipProvider(listeningPoint); + sipProvider.addSipListener(this); + Address contact = createAddr(omSipUser, localWsHost, uri -> { + try { + uri.setPort(localWsPort); + uri.setTransportParam(SIP_TRANSPORT); + } catch (ParseException e) { + log.error("fail to create contact address", e); + } + }); + contactHeader = headerFactory.createContactHeader(contact); + } + + @Override + public void processDialogTerminated(DialogTerminatedEvent evt) { + log.error("processDialogTerminated: \n{}", evt); + } + + @Override + public void processIOException(IOExceptionEvent evt) { + log.error("processIOException: \n{}", evt); + } + + @Override + public void processTimeout(TimeoutEvent evt) { + log.error("processTimeout: \n{}", evt); + } + + @Override + public void processTransactionTerminated(TransactionTerminatedEvent evt) { + log.error("processTransactionTerminated: \n{}", evt); + } + + @Override + public void processDialogTimeout(DialogTimeoutEvent timeoutEvent) { + log.error("processDialogTimeout: \n{}", timeoutEvent); + } + + @Override + public void processRequest(RequestEvent evt) { + log.debug("processRequest: \n\n{}", evt.getRequest()); + Request rq = evt.getRequest(); + String method = rq.getMethod(); + try { + if (Request.OPTIONS.equals(method)) { + ServerTransaction transaction = sipProvider.getNewServerTransaction(rq); + Response resp = messageFactory.createResponse(200, rq); + resp.addHeader(contactHeader); + transaction.sendResponse(resp); + } + } catch (Exception e) { + log.error("processRequest", e); + } + } + + @Override + public void processResponse(ResponseEvent evt) { + Response resp = evt.getResponse(); + ClientTransaction curTrans = evt.getClientTransaction(); + Request prevReq = curTrans.getRequest(); + log.warn("Response code: {} on {}", resp.getStatusCode(), prevReq.getMethod()); + switch (resp.getStatusCode()) { + case UNAUTHORIZED: + processUnauth(evt); + break; + case OK: + if (REGISTER.equals(prevReq.getMethod())) { + callbacks.onRegister(); + } + break; + case TRYING: + case RINGING: + break; + // FIXME TODO other codes: 404 + default: + log.debug("No handler for response: \n\n{}", resp); + } + } + + private String getRnd(int count) { + return rnd.ints('0', 'z' + 1) + .filter(ch -> (ch >= '0' && ch <= '9') || (ch >= 'a' && ch <= 'z')) + .limit(count) + .collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append) + .toString(); + } + + private Address createAddr(String user) { + return createAddr(user, sipHost, noop()); + } + + private Address createAddr(String user, String host, Consumer<SipUri> cons) { + try { + SipUri uri = new SipUri(); + uri.setHost(host); + uri.setUser(user); + cons.accept(uri); + return addressFactory.createAddress(user, uri); + } catch (ParseException e) { + log.error("fail to create address", e); + } + return null; + } + + private void sendRequest(String method, String to, Consumer<SipUri> uriCons, Consumer<Request> reqCons) { + try { + SipUri uri = new SipUri(); + uri.setHost(sipHost); + uri.setPort(wsPort); + uri.setTransportParam(SIP_TRANSPORT); + uri.setMethodParam("GET"); + uri.setHeader("Host", sipHost); + uri.setHeader("Location", "/ws"); + uriCons.accept(uri); + + Request request = messageFactory.createRequest( + uri + , method + , sipProvider.getNewCallId() + , headerFactory.createCSeqHeader(cseq.incrementAndGet(), method) + , headerFactory.createFromHeader(createAddr(omSipUser), tag) + , headerFactory.createToHeader(createAddr(to), null) + , List.of(headerFactory.createViaHeader(localWsHost, localWsPort, SIP_TRANSPORT, null)) + , headerFactory.createMaxForwardsHeader(70)); + request.addHeader(contactHeader); + request.addHeader(headerFactory.createExpiresHeader(600)); + + reqCons.accept(request); + + log.debug("sendRequest: \n\n{}", request); + + ClientTransaction trans = sipProvider.getNewClientTransaction(request); + trans.sendRequest(); + } catch (Exception e) { + log.error("Error while sending request", e); + } + } + + private void processUnauth(ResponseEvent evt) { + Response resp = evt.getResponse(); + ClientTransaction curTrans = evt.getClientTransaction(); + AuthenticationHelper helper = sipStack.getAuthenticationHelper((trans, s) -> new UserCredentials() { + @Override + public String getUserName() { + return omSipUser; + } + + @Override + public String getPassword() { + return omSipPasswd; + } + + @Override + public String getSipDomain() { + return "asterisk"; + } + }, headerFactory); + try { + ClientTransaction trans = helper.handleChallenge(resp, curTrans, sipProvider, 5); + trans.sendRequest(); + } catch (SipException e) { + log.error("Error while sending AUTH", e); + } + } + + private void addAllow(Request req) throws ParseException { + req.addHeader(headerFactory.createAllowHeader("ACK,CANCEL,INVITE,MESSAGE,BYE,OPTIONS,INFO,NOTIFY,REFER")); + } + + public void register() { + sendRequest( + REGISTER + , omSipUser + , noop() + , req -> { + try { + addAllow(req); + } catch (ParseException e) { + log.error("fail to create allow header", e); + } + }); + } + + public void invite(Room r, String sdp) { + final String sipNumber = getSipNumber(r); + if (sipNumber == null) { + log.warn("Failed to get SIP number for room: {}", r); + return; + } + sendRequest( + INVITE + , sipNumber + , uri -> uri.setUser(sipNumber) + , req -> { + try { + addAllow(req); + req.addHeader(headerFactory.createContentLengthHeader(sdp.length())); + req.setContent(sdp, headerFactory.createContentTypeHeader("application", "sdp")); + } catch (Exception e) { + log.error("fail to create allow header", e); + } + }); + } + + public void destroy() { + sipStack.stop(); + } +} diff --git a/openmeetings-db/src/main/java/org/apache/openmeetings/db/entity/user/AsteriskSipUser.java b/openmeetings-db/src/main/java/org/apache/openmeetings/db/entity/user/AsteriskSipUser.java index 7e9e467..1e9404c 100644 --- a/openmeetings-db/src/main/java/org/apache/openmeetings/db/entity/user/AsteriskSipUser.java +++ b/openmeetings-db/src/main/java/org/apache/openmeetings/db/entity/user/AsteriskSipUser.java @@ -125,9 +125,13 @@ public class AsteriskSipUser implements Serializable { @XmlElement(name = "callbackextension", required = false) private String callbackextension; + @Column(name = "transport", nullable = false, length = 100) + @XmlElement(name = "transport", required = false) + private String transport = "wss,ws,udp"; + @Column(name = "allow", nullable = false, length = 100) @XmlElement(name = "allow", required = false) - private String allow = "ulaw;alaw;h264"; + private String allow = "!all,ulaw,opus,vp8"; public long getId() { return id; @@ -272,4 +276,12 @@ public class AsteriskSipUser implements Serializable { public void setAllow(String allow) { this.allow = allow; } + + public String getTransport() { + return transport; + } + + public void setTransport(String transport) { + this.transport = transport; + } } diff --git a/openmeetings-server/src/site/markdown/AsteriskIntegration.md b/openmeetings-server/src/site/markdown/AsteriskIntegration.md index 554584f..0b98140 100644 --- a/openmeetings-server/src/site/markdown/AsteriskIntegration.md +++ b/openmeetings-server/src/site/markdown/AsteriskIntegration.md @@ -139,8 +139,7 @@ avpf=yes icesupport=yes directmedia=no disallow=all -allow=ulaw,opus -allow=vp8 +allow=!all,ulaw,opus,vp8 ``` ### Configure extensions: @@ -282,34 +281,4 @@ type=transport protocol=wss bind=0.0.0.0 ; All other transport parameters are ignored for wss transports. - -[webrtc_client] -type=aor -max_contacts=5 -remove_existing=yes - -[webrtc_client] -type=auth -auth_type=userpass -username=webrtc_client -password=webrtc_client ; This is a completely insecure password! Do NOT expose this - ; system to the Internet without utilizing a better password. - -[webrtc_client] -type=endpoint -aors=webrtc_client -auth=webrtc_client -dtls_auto_generate_cert=yes -webrtc=yes -; Setting webrtc=yes is a shortcut for setting the following options: -; use_avpf=yes -; media_encryption=dtls -; dtls_verify=fingerprint -; dtls_setup=actpass -; ice_support=yes -; media_use_received_transport=yes -; rtcp_mux=yes -context=default -disallow=all -allow=opus,ulaw ``` diff --git a/openmeetings-web/src/main/java/org/apache/openmeetings/web/admin/connection/KStreamDto.java b/openmeetings-web/src/main/java/org/apache/openmeetings/web/admin/connection/KStreamDto.java index 5f87f68..90cc2b7 100644 --- a/openmeetings-web/src/main/java/org/apache/openmeetings/web/admin/connection/KStreamDto.java +++ b/openmeetings-web/src/main/java/org/apache/openmeetings/web/admin/connection/KStreamDto.java @@ -50,7 +50,7 @@ public class KStreamDto implements IDataProviderEntity { public KStreamDto(KStream kStream) { this.sid = kStream.getSid(); this.uid = kStream.getUid(); - this.roomId = (kStream.getRoom() == null) ? null : kStream.getRoom().getRoomId(); + this.roomId = kStream.getRoomId(); this.connectedSince = kStream.getConnectedSince(); this.streamType = kStream.getStreamType(); this.profile = kStream.getProfile().toString();